Concurrent Ruby: Dataflow

Concurrent Ruby is a gem that was brought to my attention during Yapee’s talk at Elixir Meetup in Kraków. It looks really good, as it mentions many functional languages as sources of inspiration and also does not try to do more than it should – just be a set of higher or lower level abstractions for writing concurrent code. I must say that concurrent programming was never really among my interests so it’s great news for me that someone has taken care of some more complex stuff for me.

One that was particularily interesting for me was Dataflow. It lets you define a set of asynchronous commands along with dependencies among them. Basically, you set a task to be executed imediately after one or more other tasks were completed. I decided to play with it and work on a metaphor of preparing a ferry to set sail: you have to take care of trains, cars and people before you steer at the open seas, but those can be handled concurrently. So here is my flow definition:

require 'concurrent'

embark_trains = Concurrent.dataflow { sleep 4; puts "trains loaded" }
embark_cars = Concurrent.dataflow { sleep 6; puts "cars loaded" }
embark_passengers = Concurrent.dataflow { sleep 5; puts "passengers embarked" }

close_freight_decks = Concurrent.dataflow(embark_trains, embark_cars) do
	sleep 1
	puts "freight decks closed"
end

close_passenger_decks = Concurrent.dataflow(embark_passengers) do
	sleep 1
	puts "passenger decks closed"
end

take_off = Concurrent.dataflow(close_passenger_decks, close_freight_decks) do
	sleep 1
	puts "taking off!"
end

The tasks embark_trains, embark_cars and embark_passengers are simple and do not have any requirements to run. When defined like that, they will be executed immediately (and asynchronously). When first two are completed, close_freight_decks may be run. Closing passenger decks is possible after embarking passenger. Only then you can start engine and hoist the colours high. I used sleep to simulate workload in the tasks.

Unfortunately, if we execute this file, we will get nothing. Tasks are scheduled to be run asynchronously, but before they have a chance to print a result, the program terminates and they are lost. While simply adding sleep 20 at the and of the script would suffice to see whats going on, I used standard Ruby way to synchronize in such cases - ConditionVariable.

m = Mutex.new
cv = ConditionVariable.new
m.lock

embark_trains = Concurrent.dataflow { sleep 4; puts "trains loaded" }
[...]
take_off = Concurrent.dataflow(close_passenger_decks, close_freight_decks) do
	sleep 1
	puts "taking off!"
	cv.signal
end

cv.wait(m)

Now it’s good, but output is not very gratifying:

➜  ~  ruby dataflow.rb
trains loaded
passengers embarked
cars loaded
passenger decks closed
freight decks closed
taking off!

We have to watch the program execute to see that some tasks are performed after few seconds, and some other happen almost at the same time. We can fix it with another feature of Concurrent Ruby - TimerTask. It gets executed afted every given interval. Moreover, you can modify this interval while the timer is running, but we are not going to use this function here.

timer = Concurrent::TimerTask.new do
	@cur_time ||= 0
	@cur_time += 1
	puts @cur_time
end
timer.execution_interval = 1
timer.execute
sleep 0.1

What happens here is that we start with 0 and after every second we add one to the counter. Then we write the counter to the output. sleep 0.1 is good to desynchronize timer from tasks and be more or less sure that timer value will got to the screen before task’s message. Result looks good:

ruby dataflow.rb
1
2
3
4
trains loaded
5
passengers embarked
6
trucks loaded
passenger decks closed
7
freight decks closed
8
taking off!

So, whole program ended after about 8 seconds. If we were to run each of the tasks synchronously, it would take 18 seconds, so the gain here is quite obvious. Of course, there is no magic here which we could not achieve by playing with threads or fibers, but Dataflow gives quite nice abstraction and makes the dependencies (and whole code) more readable.

Concurrent Ruby might not be very mature project (I don’t know if it’s production-ready) but is definitely worth checking out and hopefully will get more attention in the community.

Related posts