nótaí

A web site for posts and talks about scala and other tech topics.

View My GitHub Profile

20 November 2013

Flow_actors

by Arnold deVos

Flow Actors: Another Scala Dataflow Library

Flow Actors is a compact library for asynchronous programming within a single JVM.

Please note: this library has been superceded by FlowLib, here on github.

Important Caveats:

Without Further Ado: A Graph

Here is a toy processing graph in the Flow Actors DSL:

import au.com.langdale.async.Flow._

trait SampleGraph extends SampleDecls { 

  val N = 2

  def graph =
    InitialData :- urls :-> Fetcher :- raw :-> Splitter -: ( 
  	  urls ->: Dedup -: urls ->: Fetcher & 
  	  text ->: Filer*N -: metrics ->: Reporter )
}

Uppercase identifiers such as Fetcher by convention are processes. Lowercase identifiers such as raw are labels for message flows.

The connecting operator :- attaches a flow label to a process forming a projection and :-> connects this to a target process.

These have right associative equivalents ->: and -: respectively, which are useful to express fan-out as opposed to fan-in structures.

The & operator combines graphs or projections. More about the representation of graphs and projections later.

The * operator multiplies a process. It indicates how many parallel instances should be executed.

We left out the declarations of the identifiers. Let’s put them in a separate trait without committing to the types just yet.

trait SampleDecls { 
  type Text
  type Address
  type Metric 
  val raw, text = label[Text]
  val urls = label[Address]
  val metrics = label[Metric]
  val Fetcher, Splitter, Dedup, Filer, Reporter, Supervisor: Process
}

Processes

On to the definition of processes. Lets take Dedup as an example:

trait SampleProcesses  { 
  def dedup[Message](flow: Label[Message]) = new Process {

    def description = "remove duplicate messages"
  
    def action = loop(Set.empty)
  
    private def loop(seen: Set[Message]): Action[Nothing] =
      input(flow) { message => 
      	if(seen contains message) loop(seen)
      	else output(flow, message) { loop(seen + message) }
      }
  }

  // more process definitions here ...
}

This defines a dedup method that will create a process with a given message type and flow label. The parameters make dedup potentially usable in different positions within a graph or in different graphs.

The process is constructed in continuation passing style as follows:

Any other type U indicates that a stop(u: U) action may be encountered which terminates a series of actions.

A Note About Process State

The dedup process must keep track of the messages already seen which it does using an immutable Set[Message].
Successive versions of this set are passed from continuation to continuation.

It might be tempting to use a mutable set here and make it a member of Process. That would be a common actor programming style but it is not suitable for processes.

The library assumes Process is immutable - it may not have var members or mutable members. Among other things, this enables the * operator and allows a given graph to be run more than once.

Putting it Together

To complete the example we need to bring the graph and the process definitions together:

object Sample extends SampleGraph with SampleProcesses  {

  type Address = java.net.URI
  val Dedup = dedup(urls)

  // commit the remaining types and processes here ...

  val procmap = run(graph, Supervisor)

  println(s"Started ${procmap.size} processes!")
}

This fills in the Address type and creates a Dedup process whose input and output will be given the urls label. (The remaining types and processes are omitted for brevity.)

The run(graph, Supervisor) method puts everything in motion. A network of sites connected by communication channels is created that mirrors the passed graph. The corresponding process from the graph is executed at each site.

The given Supervisor process is also executed. It is connected to the prefined errors port of all the other processes and receives messages on errors or process termination.

The run method returns a map of processes to the sites at which they are executing.

tags: