A web site for posts and talks about scala and other tech topics.
by Arnold deVos
Flow Actors is a compact library for asynchronous programming within a single JVM.
Please note: this library has been superceded by FlowLib, here on github.
Compared to futures and promises, Flow Actors has a message passing model and allows arbitrary message processing graphs. There is a DSL to describe these graphs and operators to split and join message flows in sum or product style. The degree of concurrency is tune-able at each processing node.
Compared to Akka actors, message flows are statically typed and many of the details of flow control that would be manually programmed are automatic. On the other hand, Flow Actors is confined to a single JVM instance.
Compared to Spark, Storm or Akka clusters, Flow Actors is for smaller asynchronous systems. There are many of these!
Compared to Functional Reactive Programming, there is no global synchronization of messages flows (signals and behaviours in FRP) nor any other glitch suppression strategy.
Important Caveats:
Action; Supervisor parameter for run();
(p: Process) * (n: Int) == p*n no longer holds.Here is a toy processing graph in the Flow Actors DSL:
import au.com.langdale.async.Flow._
trait SampleGraph extends SampleDecls {
val N = 2
def graph =
InitialData :- urls :-> Fetcher :- raw :-> Splitter -: (
urls ->: Dedup -: urls ->: Fetcher &
text ->: Filer*N -: metrics ->: Reporter )
}
Uppercase identifiers such as Fetcher by convention are processes. Lowercase identifiers such as raw are labels for message flows.
The connecting operator :- attaches a flow label to a process forming a projection and :-> connects this to a target process.
These have right associative equivalents ->: and -: respectively, which are useful to express fan-out as opposed to fan-in structures.
The & operator combines graphs or projections. More about the representation of graphs and projections later.
The * operator multiplies a process. It indicates how many parallel instances should be executed.
We left out the declarations of the identifiers. Let’s put them in a separate trait without committing to the types just yet.
trait SampleDecls {
type Text
type Address
type Metric
val raw, text = label[Text]
val urls = label[Address]
val metrics = label[Metric]
val Fetcher, Splitter, Dedup, Filer, Reporter, Supervisor: Process
}
On to the definition of processes. Lets take Dedup as an example:
trait SampleProcesses {
def dedup[Message](flow: Label[Message]) = new Process {
def description = "remove duplicate messages"
def action = loop(Set.empty)
private def loop(seen: Set[Message]): Action[Nothing] =
input(flow) { message =>
if(seen contains message) loop(seen)
else output(flow, message) { loop(seen + message) }
}
}
// more process definitions here ...
}
This defines a dedup method that will create a process with a given message type and flow label.
The parameters make dedup potentially usable in different positions within a graph or in different graphs.
The process is constructed in continuation passing style as follows:
The action member is the process entry point.
The input method returns an Action that will
be dispatched when a message is available on the
port labelled flow.
The function message => ... is a continuation
that is invoked when the input action is dispatched.
It returns a new Action to be dispatched.
The output method returns an Action that will be
dispatched when the message can be delivered on the
output port labelled flow.
The passed block is a continuation that is
invoked when the output action is dispatched.
Action has a type parameter, in this case
Nothing, which indicates that this series of actions
loops indefinitely.
Any other type U indicates that a stop(u: U) action
may be encountered which terminates a series of actions.
The dedup process must keep track of the messages already seen
which it does using an immutable Set[Message].
Successive versions of this set are passed from continuation
to continuation.
It might be tempting to use a mutable set here and make it
a member of Process. That would be a common actor
programming style but it is not suitable for processes.
The library assumes Process is immutable - it may not
have var members or mutable members. Among other things,
this enables the * operator and allows a given graph to
be run more than once.
To complete the example we need to bring the graph and the process definitions together:
object Sample extends SampleGraph with SampleProcesses {
type Address = java.net.URI
val Dedup = dedup(urls)
// commit the remaining types and processes here ...
val procmap = run(graph, Supervisor)
println(s"Started ${procmap.size} processes!")
}
This fills in the Address type and creates a Dedup process
whose input and output will be given the urls label.
(The remaining types and processes are omitted for brevity.)
The run(graph, Supervisor) method puts everything in motion. A network of sites
connected by communication channels is created that mirrors the passed graph.
The corresponding process from the graph is executed at each site.
The given Supervisor process is also executed. It is connected to the prefined errors
port of all the other processes and receives messages on errors or process termination.
The run method returns a map of processes to the sites at which they are executing.
tags: