Saturday, May 17, 2014

Topic from scalaz-stream

I've been playing around with scalaz-stream topics, and didn't understand how they worked. My first attempt was this:



(And note that this was supposed to be just a quick example; you certainly would end up with different threads publishing and subscribing to a topic.)

That had a few problems:

• Line 7-8: the type of out1 is Process[Task, String \/ Int]. Those are useful, but they're only part of the way towards what I needed. What I really wanted was to go from a process to a task to running the task.

• Line 11-13: again, I'm only part of the way there. Both of these produce Task[Unit], and they needed to be run.

So I asked for help on the scalaz mailing list, and Pavel Chlupáček pointed me in the right direction.
The working version is:



• Lines 6-8 produce a Task[IndexedSeq[String \/ Int]], one step further than in the first version (where the steps are process >> task >> run the task).

• Lines 10-12 actually run the task - and anything published before the attemptRun won't be seen by the subscriber.

• Line 19 stalls, waiting for the attemptRun to hook up the subscriber. (As the note says, I think this is only something you'd do for a demo).

• Lines 21-22 actually run the tasks they create

1 comment:

Unknown said...

Cool, glad you find it.

In production code though we won't call Thread.sleep, But rather to wrap this "warmup" start to proess as well. That can bea easily achieved via
Process.sleep.

i.e.

```
(Process.sleep(100 millis) fby Process.emit("foo"))
.to(t.publish).run.run

```

Process.sleep is overall bad idea. usually we use other primitives to really assure that we have registered before anything was published.