This post describes building simple composable execution pipelines with Ruby, it’s a useful technique that enables writing reusable, chainable and testable components.
Execution pipeline
Execution pipeline (also known as Chain of responsibility pattern) focuses on performing set of actions; IE an “order creation pipeline”:
- creates an order
- creates order details
- decreases inventory
- sends out notifications
- etc.
I’d like to note that execution pipeline is a bit different from a data pipeline in a way that it focuses on performing often unrelated actions rather than transforming data(stream).
An example
Here’s an example toy implementation for our order process.
def validator(&_next)
lambda do |order|
if !(order[:total] > 0)
order[:errors] << "[Validator]: Invalid total"
puts "[Validation] not ok"
return
end
puts "[Validation] ok"
_next.call(order)
end
end
def persist(db:, &_next)
lambda do |order|
if db.save_order(order)
_next.call(order)
end
end
end
def client_notifier(mailer:, &_next)
lambda do |order|
if order[:errors].empty?
mailer.order_received(order)
end
_next.call(order)
end
end
class WithTransaction
def to_proc
lambda(&method(:call))
end
def initialize(db:, &_next)
@db = db
@next = _next
end
def call(order)
@db.transaction do
@next.call(order)
end
end
end
def nop
lambda do |_|
puts "[End]"
end
end
def fake_db
Class.new do
def transaction(&block)
puts "[DB] starting txn"
block.call
puts "[DB] done txn"
end
def save_order(order)
puts "[DB] saving order: total #{order[:total]}"
end
end
end
def fake_mailer
Class.new do
def order_received(order)
puts "[Mailer] order received"
end
end
end
def order_pipeline(db:)
validator(
&WithTransaction.new(db: db,
&persist(db: db,
&nop)))
end
def build(*pipelines)
lambda do |order|
puts "[Pipeline][#{order[:name]}] starting"
pipelines.each do |p|
p.call(order)
end
puts "[Pipeline][#{order[:name]}] done"
puts
end
end
pipeline = build(
order_pipeline(db: fake_db.new),
client_notifier(mailer: fake_mailer.new, &nop)
)
[
{name: "Order1", total: 10, errors: []},
{name: "Order2", total: -1, errors: []}
].each { |order| pipeline.call(order) }
Running the code above produces:
$ ruby exec-pipeline.rb
[Pipeline][Order1] starting
[Validation] ok
[DB] starting txn
[DB] saving order: total 10
[DB] done txn
[Mailer] order received
[End]
[Pipeline][Order1] done
[Pipeline][Order2] starting
[Validation] not ok
[End]
[Pipeline][Order2] done
Explanation
- above pipeline implementation, at its core, is a linked list of
lambda
s (aka stage) where every stage maintains a reference to the next one. - there’s a special stage
nop
(aka “no operation”) which is a terminal stage(doesn’t have|call next stage reference) build
builds the list(s) of stages of the process- each stage is
call
ed with anorder
hash, which gets mutated as a shared state (ie for adding:errors
)
Note, in the example above, the client_notifier
is separate from order_pipeline
because
notification failure must not affect order persistence in case of a notification error(causing DB transaction rollback).
Summary
Pros
build
in the example provides high level overview what the process does- simple and DSL: it’s a pure Ruby implementation that doesn’t have any external dependencies
- composable: let’s inject/remove stages from the pipeline(s) easily
- testable: all dependencies of the pipeline are explicitly injected; above example uses
fake_db
andfake_mailer
stubs which makes testing pipeline fairly simple. - encourages building decoupled components
Cons
- above implementation doesn’t allow dynamic pipeline building(in runtime), thought it could be fairly simple to implement
- overkill in simplest cases
Using PORO for pipeline stage
You might have noticed that aside from plain lambda
s there’s also a WithTransaction
class.
It’s there as an example of using POROs which may be necessary for some cases when a stage code grows a bit more than few lines.