Simple execution pipelines with Ruby

gmarik 3 min
Table Of Contents ↓

This post describes building simple composable execution pipelines with Ruby, it’s a useful technique that enables writing reusable, chainable and testable components.

Execution pipeline

Execution pipeline (also known as Chain of responsibility pattern) focuses on performing set of actions; IE an “order creation pipeline”:

  1. creates an order
  2. creates order details
  3. decreases inventory
  4. sends out notifications
  5. etc.

I’d like to note that execution pipeline is a bit different from a data pipeline in a way that it focuses on performing often unrelated actions rather than transforming data(stream).

An example

Here’s an example toy implementation for our order process.

def validator(&_next)
  lambda do |order|
    if !(order[:total] > 0)
      order[:errors] << "[Validator]: Invalid total"
      puts "[Validation] not ok"
      return
    end
    puts "[Validation] ok"
    _next.call(order)
  end
end

def persist(db:, &_next)
  lambda do |order|
    if db.save_order(order)
      _next.call(order)
    end
  end
end

def client_notifier(mailer:, &_next)
  lambda do |order|
    if order[:errors].empty?
      mailer.order_received(order)
    end
    _next.call(order)
  end
end

class WithTransaction
  def to_proc
    lambda(&method(:call))
  end

  def initialize(db:, &_next)
    @db = db
    @next = _next
  end

  def call(order)
    @db.transaction do
      @next.call(order)
    end
  end
end

def nop
  lambda do |_|
    puts "[End]"
  end
end

def fake_db
  Class.new do
    def transaction(&block)
      puts "[DB] starting txn"
      block.call
      puts "[DB] done txn"
    end

    def save_order(order)
      puts "[DB] saving order: total #{order[:total]}"
    end
  end
end

def fake_mailer
  Class.new do
    def order_received(order)
      puts "[Mailer] order received"
    end
  end
end

def order_pipeline(db:)
  validator(
    &WithTransaction.new(db: db,
      &persist(db: db,
          &nop)))
end

def build(*pipelines)
  lambda do |order|
    puts "[Pipeline][#{order[:name]}] starting"
    pipelines.each do |p|
      p.call(order)
    end
    puts "[Pipeline][#{order[:name]}] done"
    puts
  end
end

pipeline = build(
    order_pipeline(db: fake_db.new),
    client_notifier(mailer: fake_mailer.new, &nop)
)

[
  {name: "Order1", total: 10, errors: []},
  {name: "Order2", total: -1, errors: []}
].each { |order| pipeline.call(order) }

Running the code above produces:

$ ruby exec-pipeline.rb
[Pipeline][Order1] starting
[Validation] ok
[DB] starting txn
[DB] saving order: total 10
[DB] done txn
[Mailer] order received
[End]
[Pipeline][Order1] done

[Pipeline][Order2] starting
[Validation] not ok
[End]
[Pipeline][Order2] done

Explanation

Note, in the example above, the client_notifier is separate from order_pipeline because notification failure must not affect order persistence in case of a notification error(causing DB transaction rollback).

Summary

Pros

Cons

Using PORO for pipeline stage

You might have noticed that aside from plain lambdas there’s also a WithTransaction class. It’s there as an example of using POROs which may be necessary for some cases when a stage code grows a bit more than few lines.

Alternatives

Related Posts
Read More
Using vim to stream-process text
Understanding Go's `for` loop with closures
Comments
read or add one↓