Using the Windows Workflow Foundation Queuing system

 
Using the WF queuing system can be very interesting if we want to create
custom event Activities like long running activities.
We notice many questions about messaging, communication between
the workflow and the host in the MSDN Windows Workflow forum.
A few months ago, Paul Andrew (MS PM)  has posted a very interesting sample illustrating
long running activities; I think this is a key sample to start working with the WF queueing system.
 
To follow my explanations, download the sample and open the solution in Visual Studio.Net.
 
Here are some key points of this sample :
 
1.A Custom activity
 
class FactorPrimesActivity : Activity {
 
protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
{
 //TODO :some code will come here
 return ActivityExecutionStatus.Executing;
}
 
}
 
Since this function returns "ActivityExecutionStatus.Executing"
and not "ActivityExecutionStatus.Close" , this activity will stay in "running state" until
the activity context is closed.
 
2.Start the work
 
  • We want to start some work in an external thread for instance, the best way is to delegate the task to
    a service running in the workflow runtime thread: that’s why we need a Workflow Service.
  • When the job will be  completed , we will notify the activity that it’s done by using WF queueing system.
  • A message queue will be created by the activity; the custom service will post messages in this queue.
  • The Queue must have a unique name : we can use a Guid only , but for debugging purpose
    we’ll use the activity name + a Guid.
protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
{
      //unique queue name
      string qName = this.QualifiedName + "ResultQueue";
      WorkflowQueuingService qServ = context.GetService<WorkflowQueuingService>();
      //the queueing system creates the queue 
      WorkflowQueue q = qServ.CreateWorkflowQueue(qName, false);
      //when a message arrives in this queue, we will close the activity context
      //OnItemAvailable event handler will be described later
      q.QueueItemAvailable += OnItemAvailable;
      //the runtime waits here, this is a blocking operation
      //
      // TODO : call a function in our custom service to run in another thread
      //
      return ActivityExecutionStatus.Executing;
}
 
3.How to be notified that the job is done
 
Before writing the service, let’s write the link between the activity and the custom service :
 
private void OnItemAvailable(object sender, QueueEventArgs e)
{
    //We get the activity context in order to close the activity (see this method last line)
    ActivityExecutionContext context = sender as ActivityExecutionContext;
   // we use the internal Queuing service
   WorkflowQueuingService qServ = context.GetService<WorkflowQueuingService>();
   //The queue will be created by the service (we’ll see that late)r
   WorkflowQueue q = qServ.GetWorkflowQueue(e.QueueName);
   //We pop the info from the queue; here we know that what our custom service provides is an int  
   result = (int)q.Dequeue();
   //we don’t need the queue anymore
   qServ.DeleteWorkflowQueue(e.QueueName);
   // we close the activity context =>the workflow can continue to the next activity
   context.CloseActivity();
}
4.Let’s write the custom service
 
 public class FactoringService
 {
   private WorkflowRuntime runtime;
   public FactoringService(WorkflowRuntime runtime)
   {
      this.runtime = runtime;
   }
        
   public void FactorPrimes(IComparable resultQueueName)
   {
            //TODO the job will run here on an other thread
    }
        …
 }
Another option could have been to derive FactoringService from WorkflowRuntimeService.
 
5°Let’s register this service
 
   class Program
    {
        static void Main(string[] args)
        {
            WorkflowRuntime workflowRuntime = new WorkflowRuntime();
            //
            // Add the factoring service to allow the prime factoring to be performed
            // asynchronously to the workflow
            //
            workflowRuntime.AddService(new FactoringService(workflowRuntime));
    …
    }
 
 
6.Let’s call our custom service from our custom activity:
 
protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
{
     string qName = this.QualifiedName + "ResultQueue";
     WorkflowQueuingService qServ = context.GetService<WorkflowQueuingService>();
     WorkflowQueue q = qServ.CreateWorkflowQueue(qName, false);
     q.QueueItemAvailable += OnItemAvailable;
     FactoringService factServ = context.GetService<FactoringService>();
     //we provide the custom service with the queue name to enable communication
     factServ.FactorPrimes(qName);
     return ActivityExecutionStatus.Executing;
}
      
7.How can the custom service send a message back to the waiting activity
    public class FactoringService
    {
      private void FactorPrimes(object state)
        {
         //…
        WorkflowInstance wi = runtime.GetWorkflow(factState.instanceId);
        //
        // We push the message in the queue-> back to the activity and pass the result
        //
            wi.EnqueueItem(<the queue name>, <the value>, null, null);
        }
8.How can the job be running on another thread? we delegate the task to the threadpool which will decide to use another thread or to uxe the existing idle thread :
 public class FactoringService
    {
     …
     public void FactorPrimes(IComparable resultQueueName)
        {
            ThreadPool.QueueUserWorkItem(FactorPrimes, new FactoringState(WorkflowEnvironment.WorkflowInstanceId,  resultQueueName));
        }
 
 
 
 
About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s