Chapter 16

Workflows, Asynchronous & Parallel Programming

Main Page

Introduction

In this chapter, we discuss workflows.  Workflows form the foundation of parallel and asynchronous computing. We will also introduce the concept of monads, the underpinnings of workflow implementations.

Workflows

A workflow, also known as a computation expression, is a mechanism that enables us to implement algorithms that execute a controlled series of expression evaluation steps. With workflows, the output of step 1 serves as the input to step 2, the output of step 2 serves as the input to step 3, etc. This chain of execution continues as long as steps execute without error and intermediate results are valid. If any step in the chain encounters an exception, the evaluation terminates prematurely and all remaining steps are skipped. The value of the workflow expression is the value returned by the last step. We tend to use this style of programming when we need to systematically “build up” a result from constituent parts – and we want to check these parts along the way.

You may be asking yourself, “Isn’t this what programming languages do anyway? Don’t algorithms execute step by step already?” In F#[1], the answer is yes, they do. The only thing that needs to be made explicit (in F#) is the checking of the steps’ output and the passing of this output to subsequent steps.  By using workflows, we can run code “in between” the lines of the explicit code automatically. For example, let’s suppose we a program that contains 3 expressions that return Boolean values:

let a = expression1

if a then

    let b = expression2

    if b then let c = expression3

 

In this example, the act of evaluating the next expression is dependent on checking the result of the previous one. This pattern appears often in programming – so often in fact, that it would be convenient to do the check “behind the scenes” or “in between” each line, so that we could write the following, but achieve the same result as above:

let a = expression1

(implict check here)

let b = expression2

(implicit check here)

let c = expression3

Let’s take a look at a more concrete example. We’ll do things first without using workflows, then turn around and simplify things using workflows.

Let’s define a function that checks the existence of a list element. If the element exists, the function returns Some(element), otherwise the function returns None.

let exists e (lst: list<'a>) =

    let rec find e lst =

        match lst with

            | [] -> None

            | h :: t ->

                if (h = e) then Some(e)

                else find e t

    find e lst

Let’s suppose we want to now use this function to ensure that a given list contains a given Web protocol, “http”, a given site, “www.microsoft.com, and a certain port designation, “80.” If it does, we’ve satisfied all of the conditions of the computation and we download the contents of the URL, otherwise, the computation fails without error:

let mywebaddr = ["http"; "www.microsoft.com"; "1234"; "80"]

 

let download protocol site port = printfn "download: %s %s %s" protocol site port


let fetch =

    let protocol = exists "http" mywebaddr

    if Option.isSome(protocol) then

        let site = exists "www.microsoft.com" mywebaddr

        if Option.isSome(site) then

            let port = exists "80" mywebaddr

            if Option.isSome(port) then Some(download protocol site port)

            else None

        else None

    else None

 

 

If we really liked pattern matching syntax in F#, we might have choosen to write the above example this way:

 

let fetch =

    let protocol = exists "http" mywebaddr

    match protocol with

    | None -> None

    | Some(p) ->

        let site = exists "www.microsoft.com" mywebaddr

        match site with

        | None -> None

        | Some(s) ->

            let port = exists "80" mywebaddr

            match port with

            | None -> None

            | Some(pt) -> Some(download p s pt)


There are a couple of things to notice here. First, this code is “flowing” or cascading – the result of a given step (calculation) is used as the basis for performing the next step (calculation). At some point, the cascade ends, either by returning an error value, None, or by returning a valid result, Some(download p s pt). Second, it’s somewhat verbose and awkward. Programming in this way gets confusing and error-prone rather quickly.

The idiom of “cascading logic” we see here is called a continuation. A continuation takes as input a value and feeds it to a function, sometimes referred to as the “continuation function”. The value serves as input to the function, and the function represents the “remaining work” or “the rest of the computation to perform.” Using a continuation, we could recode the above example as follows:

let mywebaddr = ["http"; "www.microsoft.com"; "1234"; "80"]

 

let download protocol site port = printfn "download: %s %s %s" protocol site port

 

let fetch =

    exists "http" mywebaddr (fun p ->

        exists "www.microsoft.com" mywebaddr (fun s ->

            exists "80" mywebaddr (fun pt -> download p s pt)))

This is more compact than the previous code examples, but we can do even better.

 Because this idiom is common in functional programming, F# supports it via a class called a “builder” that’s used in conjunction with special workflow syntax. The builder is a class that by convention (and the workflow mechanism depends on it) implements two functions: Bind and Return. The Bind function takes as input a value and a function representing the “rest of the computation” (sound familiar?). The Return function simply returns the value passed into it:

type UrlBuilder() =

    member this.Bind(v, f) =

        match v with

        | Some(x) -> f x

        | None -> None

    member this.Return(v) = v

Once we have a builder class, we can create a workflow using it:

let urlbuilder = new UrlBuilder()

let fetch =

    urlbuilder {

        let! p = exists "http" mywebaddr

        let! s = exists "www.microsoft.com" mywebaddr

        let! pt = exists "80" mywebaddr

        return download p s pt

    }

The symbol fetch now refers to a workflow. To begin a workflow, we create an instance of the builder class – in this case UrlBuilder. This class implements the required Bind and Return methods. We then create a workflow expression using the builder { } syntax – for example, urlbuilder { } as in the code example.

Within the workflow, notice the let! keyword. let! is a shortcut way to call the builder’s Bind method., urlbuilder.Bind(). The first parameter to Bind is the right-hand side of the let!’s equal sign, exists "http" mywebaddr, and the second parameter, filled in by the compiler, is the continuation function, i.e., the function to call if the previous function succeeds. In workflows, this continuation function is simply the function lexically following the given statement. In this example, it’s the next “exists” function. If any of the functions in this function chain fails, the workflow terminates prematurely.

 

In addition to let!, workflows support other special keywords. All of these special keywords are simply  syntactic sugar – they all resolve to methods on the builder class. The exclamation (!) point is the key to having a keyword, e.g., let!, translate to a call to a call on the builder, e.g., Bind. This is powerful ! Since we control the builder’s implementation, e.g., Bind, we control the implementation of let! and the other special workflow keywords.

For you convenience, the workflow keywords that we use most often are summarized below:

·         let! – calls the builder’s Bind method

·         do! – calls the builder’s Bind method

·         return – calls the builder’s Return method

·         return! – calls the builder’s ReturnFrom method

·         yield – calls the builder’s Yield method

·         yield! – calls the builder’s YieldFrom method

·         use – calls the builder’s Using method, which is expected to call Dispose on the result of the workflow

·         use! – calls the builder’s Using method, which is expected to call Dispose on the result of the workflow

Your builder classes can also implement additional methods that affect how the workflow behaves. Some of these methods are:  ReturnFrom (mentioned above vis-a-vis return!), Delay, TryWith, and YieldFrom. Please consult the F# documentation for details, e.g., you can find the set of methods defined on the Control.AsyncBuilder class (discussed below).

You should note that within workflows you can still use standard F# keywords and data structures normally, e.g., let still executes a standard assignment.

At the time of this writing, workflows impose two constraints: you cannot define new types within a workflow and you cannot use mutable values – you must use reference cells instead.

So far, we’ve seen how workflows can provide a handy way for us to “automatically” and progressively check the return values in a chain of functions. While useful, this is just the beginning. Workflows can help us encapsulate state, perform automatic logging, check for numeric underflows and overflows in calculations, etc. What is really exciting, though, is that we can use workflows to help make parallel and asynchronous programming simple! Enter asynchronous (async) workflows.

Asynchronous (Async) Workflows

Async workflows enable parallel execution, asynchronous execution, and reactive execution – and combinations thereof. Let’s define these terms so that we’re using them consistently:

·         Parallel execution means starting several operations at once and waiting for all of them to finish before continuing, e.g., downloading 10 stock quotes “in parallel.”

·         Asynchronous execution means to start doing something in the background (could be in parallel, too) and notify the original caller when finished, e.g., calling a Web service.

·         Reactive execution means “wait for something to happen” and then respond, e.g., wait for the user to click a button.

To define and use an async workflow, we use the following syntax:

let identifier = async { expression }

The keyword async tells F# to create an instance of the AsyncBuider. This class implements the requisite Bind, Return, etc. methods.  The workflow expression is a series of standard F# expressions, including synchronous and asynchronous calls to other functions and workflows. 

async { } returns an Async<'a> instance, called an async computation, indicated by identifier in the syntax example. The Async’s type, 'a, is bound to the workflow’s return type via the workflow’s final return or return! expression. We use return! when the value is computed via an asynchronous call.

Note that async workflow does not execute immediately. To execute the workflow, we need to use the returned Async<'a> instance in conjunction with methods in the Async class.  The Async class is a static class that implements members for creating and manipulating asynchronous computations. As of this writing, there are 26 methods documented, enabling a good deal of flexibility. We will discuss the three most-often used functions:  Async.Start, Async.Parallel and Async.RunSynchronously.[2] [3]

Async.Start is the simplest way to start an async workflow. It does not support the workflow returning a value, meaning that your code does not wait for it to return a result. If you have a workflow whose return value is important, don’t use Async.Start – use Async.RunSynchronously (described in the next paragraph) instead.

Async.Parallel takes a sequence of async computations and queues each one for execution on a thread from the .NET thread pool, as available. Async.RunSynchronously executes these Async.Parallel computations on these threads, and waits for the final result.[4] The async computation normally runs to completion, at which point the .NET thread is returned to the pool, and the result of the computation is handled by whatever physical thread is executing the workflow. This means that the thread doing the async work, and the thread processing the results of the async work can be, and often are, different physical threads. Some literature refers to this as “thread hopping.” I mention it here because if  you use a debugger to debug async workflows, you might see thread IDs changing during the course of execution.

If an async computation throws an exception, Async.RunSynchronously internally catches and re-throws it, making exception handling possible. This is a marked improvement over how C# and other .NET languages handle async exceptions.

We often use async { },  Async.Parallel, and Async.RunSynchronously in combination to execute work asynchronously. In the following example, we use these constructs to asynchronously download Web pages and parse their links:

open System.Text.RegularExpressions

 

let urls = ["http://www.google.com/"; "http://microsoft.com/"; "http://www.wordpress.com/"; "http://www.peta.org"]

 

let downloadUrl(url: string) =

    async {

        let wc = new System.Net.WebClient()

        let matches =
            wc.DownloadString(url)
               |> fun html -> Regex.Matches(html, @"http://\S+")

        printfn "Scanning %s, found %d links" url matches.Count 

    }

   

At this point in the code, we’ve set up the async workflow. We now need to run the workflow using functions from the Async class, as shown here:

// Async calls made here

let parseUrls() =

    Async.RunSynchronously(
        Async.Parallel [for i in 0..urls.Length-1 -> downloadUrl urls.[i]])


It is very common to see these parallel calls used in conjunction with the forward pipe operator, as shown here:

let parseUrlsPipe() =

    [for i in 0..urls.Length - 1 -> downloadUrl urls.[i]]

        |> Async.Parallel

        |> Async.RunSynchronously

In addition to using the Async class to execute async workflows, we can also use let!, do!, return!, return, etc. in our workflows. As described earlier, F# recognizes the special ! syntax in workflows and binds the associated keywords to methods on the builder class – in this case, AsyncBuilder.

let!, do! and return!

Within async workflows, we need to be aware of the differences between let vs. let!, do vs. do!, and return vs. return!. These are compared and contrasted below:

·         let binds a value or function to an identifier - nothing new here. In contrast, let! executes an async workflow on its own thread and binds its return value to an identifier. You can simply treat the async operation as the value it returns. Note that when a workflow started by let! returns, the rest of the original workflow will continue to run on the new thread. If you use let! and watch the thread ID of an async workflow over time, you will see the workflow run on different threads as async child workflows complete and return.

·         do executes expressions synchronously, while do! executes expressions asynchronously. do! is used to execute an expression whose return value is uninteresting and can be ignored.

·         return returns a result, while return! executes an async workflow and returns its return value as a result.

When you see let!, do!, and return! used in an async workflows, think to yourself, “F# is going run this work asynchronously.” This is because the Bind, Return, etc. methods on the associated AsyncBuilder are implemented to create and manage work using multiple threads. Note that if you don’t use !, nothing evil happens – expressions simply run synchronously. Let’s look at an example of an async workflow that that uses plain let and return:

open System.Threading

 

let asyncAdd (x: int) (y: int) =

    async {

        let sum = x + y

        return sum

    }

 

let add x y =

    [asyncAdd x y] |> Async.Parallel |> Async.RunSynchronously

   

Here, we use an async workflow with normal let and return to demonstrate that their use has not changed – they do exactly what they’ve always done.

Let’s look at another example that uses a more sophisticated async workflow to see let! and do! in action. Note that this example is adapted from one that originally appeared in Ted Neward’s 2008 MSDN Magazine article, Use Functional Programming Techniques in the .NET Framework:

open System

open System.IO

 

let TransformImage pixels i =

    let newImg: byte[] = Array.create 1024 (byte 0)

    newImg

 

let ProcessImage(i) =

    async {

        use  inStream  = File.OpenRead(sprintf "source%d.jpg" i)

        let! pixels = inStream.AsyncRead(1024 * 1024) 

        let  pixels' = TransformImage pixels i

        use  outStream = File.OpenWrite(sprintf "result%d.jpg" i)

        do!  outStream.AsyncWrite(pixels')

        do   Console.WriteLine "done!" 

    }

 

let numImages = 5

let ProcessImages() =

    [ for i in 1 .. numImages -> ProcessImage(i) ]

        |> Async.Parallel |> Async.RunSynchronously

 

Note the use of let! – this tells F# to execute the async operation (AsyncRead) on a separate thread and to bind the result to the identifier pixels. The code then invokes the TransformImage function (synchronously from the perspective of this workflow), and writes the results asynchronously to an output file via AsyncWrite. The workflow ends by printing a message to the console.

When you use let! and do! (and the other async keywords) in a workflow, the F# expressions following the let! or do!, etc. must be asynchronous. In other words, you cannot call let!, do!, etc. with synchronous functions. For example, the following code will fail:

let computeSquaresAsync =

    async {

        for i = 1 to 1000 do

            let! square = i * i   // <<< fails

            printfn "%d\t%d" i square

        printfn "done"

    } |> Async.Start

Since the code i * i does not run asynchronously (the * operator is not async), the let! does not compile. If we change this to plain let, the code compiles.  Note that the assignment operator works fine, e.g., let! x = 100 works fine in async workflows.

This constraint has implications: If you want to use .NET library code in async workflows, and want to execute the .NET library code asynchronously, make sure to use the async versions of methods and functions. For example, if you want to read and write files asynchronously in a workflow, use System.IO.Stream.AsyncRead and System.IO.Stream.AsyncWrite methods.

For useful async extensions to the .NET library, please refer to the F# PowerPack (FSharp.PowerPack.dll).

A Note on Locking

The F# literature on the Web is somewhat confusing with regards to what async workflows offer with respect to protecting data from multithreaded access. In a nutshell, the answer is “none.” The benefits we get from using async workflows are somewhat dependent on the “whole” of functional programming, including the use of immutable data structures. By using immutable data structures, we naturally circumvent a class of multithreaded “gotchas. “

If your program does use mutable data structures, you need to take the same precautions in F# that you would with any other language capable of executing multiple threads simultaneously. You need to lock shared state for writes, etc.  A full discussion on multithreaded and parallel programming is beyond the scope of this text. For more information, please see the MSDN documentation on parallel programming.

 

Handling Exceptions in Async Workflows

In async workflows, exceptions are not caught and handled by default. This means that if an async workflow throws an exception, the process will terminate. Luckily, we have a way to deal with this situation by using Async.Catch, as shown in the following example[5]:

let square_if_even n =

  async {

    do if n % 2 <> 0 then failwith "not even"

    return n * n

  }

 

let get_square x =

    square_if_even x

    |> Async.Catch

    |> Async.RunSynchronously

    |> function

        | Choice1Of2 answer -> printfn "ok: %d"  answer

        | Choice2Of2 except -> printfn "exn: %s" except.Message

 Async.Catch sets up an exception handler for the async workflow. If the async workflow returns successfully, the first match pattern, Choice1of2 is matched. If an exception occurs, the second pattern, Choice2of2 is matched. The Choice type, shipped in F#’s core library, is defined as follows (note the capital “O” in Of):

type Choice<'T1,'T2> =

    | Choice1Of2 of 'T1

    | Choice2Of2 of 'T2

Cancelling Async Workflows

When executing an async workflow, you may want to cancel it, i.e., terminate it prematurely, before it completes normally. For example, you may want to cancel a workflow if its execution time execeeds a certain threshold, or the user grows impatient waiting for an operation to complete. Async workflows support cancellation.

To set up an async workflow so that it can be cancelled, you need to run it through the TryCancelled function.

open System

open System.Threading

 

// Async workflow that takes a long time.

let longOperationAsync delay =

    async {

        printfn "doing time-consuming thing..."

        printfn "waiting %d seconds" delay

        let wait = delay * 1000

        Async.Sleep(delay) |> ignore

        printfn "time-consuming operation complete"

    }

 

// Cancellation function.

// Called if the async workflow is cancelled.

let onCancelled (c: OperationCanceledException) =

    printfn "operation canceled: %s" c.Message

// Sets up async workflow that can be cancelled.

// Associates async workflow with cancellation function.

let delay = 15

Async.TryCancelled(longOperationAsync delay, onCancelled)

    |> Async.Start

   

if delay > 10 then  // >10 secs is too long for us to wait!

    Async.CancelDefaultToken()

Notice the call to CancelDefaultToken(). This call cancels the most recently executed async workflow.The “DefaultToken” in the function name refers to the underlying mechanism by which F# enables cancellation – a cancellation token. Whenever you start an async workflow, e.g., using Async.Start, F# assigns the workflow a CancellationToken. A cancellation token is a .NET structure defined in the System.Threading namespace that tracks whether or not the async workflow has received a cancellation request, e.g., via CancelDefaultToken. Whenever the async workflow executes a let! or do!, the underlying AsyncBuilder checks the status of the workflow’s CancellationToken. If the token is marked “cancelled”, the workflow terminates and the associated cancellation function, a.k.a., compentation function, is called.

You can assocate your own CancellationToken with a workflow explicitly by creating a CancellationTokenSource object and passing the CancellationTokenSource.Token member to the Async methods that start the workflow, e.g., Async.Start.

let cts = new CancellationTokenSource()

let asyncwf = Async.TryCancelled(longOperationAsync delay, onCancelled)

Async.Start(asyncwf, cts.Token)

Note that the forward pipe syntax does not work here, since we need to pass the Async.Start method a tuple. To cancel the associated workflow, we do so via the CancellationTokenSource instance:

cts.Cancel()

F# supports a convenience function[6] that enables us to launch a cancellable async workflow in one fell swoop:

Async.StartWithContinuations(

    longOperationAsync delay,

    (fun result -> printfn "workflow completed ok"),

    (fun ex -> printfn "exception handler here"),

    (fun cncl -> printfn "cancellation handler here"),

    cts.Token)

This function starts the async workflow and calls the approprite completion function on a normal completion, exception or cancellation.

Monads

Monads are constructs that stem from a branch of mathematics known as category theory, which in part deal with the translation or morphing of mathematical structures, including functions.

In functional programming, monads play a role in that they can help enable function composition and chaining while enabling controlled side effects. Let’s suppose, for example, we want to execute a series of functions: f -> g -> h (f executes and passes its output to g, etc.). To make this work, f’s output must be compatible with g’s input, and g’s output must be compatible with h’s input. Let’s suppose we also want to log each function call in the chain as it executes, thus introducing a side effect explicitly.

We can satisfy both of these requirements via using monads. Formally, a monad is a type (class) needs to implement a function called bind, a function called return, and a type constructor that allows for the composition of monadic functions, i.e., functions that take monads as arguments and/or return monads as return values. In the documentation, monad types are generally denoted by a capital M.

bind and return are complementary functions. return takes a plain value and wraps it in a monad type, e.g., takes an int and returns a monad object that wraps the int. bind does the reverse – it takes a monad, unpacks its plain value, and passes it to the next function in the chain. The next function in the chain will accept a monadic type that can be created from the output of the previous function.

 

In the F# documentation for workflows, which are built on monadic constructs, you will see bind and return defined as follows

 

Bind: M<'a> * ('a -> M<'b>) -> M<'b>

Return: 'a -> M<'a>

As you can see, the bind method, Bind, takes  an M<'a> and a function that converts an 'a to an M<'b> and produces an M<'b>. The return function, Return, takes a plain 'a and returns a 'a wrapped in a type M.

The bind and return functions work together to ensure that functions can be called in a chain, while the implemention of the bind method can support side-effects, e.g., logging each function call or running code on a separate thread. The builder classes, e.g., our custom urlbuilder and the built-in AsyncBuilder, discussed earlier, adhere to  this monadic pattern.

What You Need to Know

·         Monads are constructs used in functional programming that enable the explicit and ordered chaining of functions and the controlled execution of side effects.

·         We use F# workflows to implement programs in a monadic style. Workflows hide the lexical complexity of nested (bind(return…))) call chains via syntax sugaring.

·         Workflows are built using builders that implement bind and return, at the very least. Builders may implement additional methods as well.

·         Async  workflows are workflows that use multiple threads to accomplish work asynchronously. Async workflows use threads from the default .NET thread pool.

·         When using async workflows, you need to be aware of the difference between let and let!, do and do!, and return and return!. The ones that use the bang (!) symbol initiate and require async operations.

 



[1] This is not the case in pure functional languages such as Haskell, since these language evaluate all expression lazily, enabling the language runtime to reorder execution.

[2] The old name of this function was Run. As of this writing, Run is deprecated. Use RunSynchronously instead.

[4] The fact that RunSynchronously waits for the thread to finish is where the “synchronously” part of the name comes in.

[5] Adapted from Matthew Podwysocki’s blog: http://weblogs.asp.net/podwysocki/archive/2008/08/15/async-computation-expressions-resource-and-exception-management.aspx.

[6] I adapted this example from a preview copy of Chris Smith’s Programming F# book.

 

Feedback

We welcome your feedback. If you have comments or questions about this chapter, please feel free to e-mail us at

Keep Reading

Next Chapter...