Ruby based ETL pipeline with Iron.io and Redshift

By Harlow Ward

As the data requirements of our Business Intelligence team grow we’ve been leveraging Iron.io’s IronWorker as our go-to platform for scheduling and running our Ruby based ETL (Extract, Transform, Load) worker pipeline.

Business units within HotelTonight collect and store data across multiple external services. The ETL pipeline is responsible for gathering all the external data and unifying it into Amazon’s excellent Redshift offering.

Ruby ETL pipeline overview

Redshift hits a sweet spot for us as it uses familiar SQL query language, and supports connections from any platform using a Postgres Adapter.

This allows our Ruby scripts to connect with the PG Gem, our Business Intelligence team to connect with their favorite SQL Workbench, and anyone in our organization with Looker access to run queries on the data.

Worker Platform

The team at Iron.io have been a great partner for us while building the ETL pipeline. Their worker platform gives us a quick and easy mechanism for deploying and managing all our Ruby workers.

IronWorker dashboard

The administration area boasts excellent dashboards for reporting worker status and gives us great visibility over the current state of our pipeline.

Single Responsibility Workers

Keeping the worker components modular allows us to separate the concerns of each worker and create a repeatable process for each of our ETL integrations.

Ruby worker detail

The modular design allows for persistence points along the lifetime of pipeline. This allows us to isolate failures and recover if data integrity issues arise.

Chain of responsibility

Each worker in the pipeline is responsible for its own unit of work and has the ability to kick off the next task in the pipeline.

For example when the Mixpanel Extractor completes the export of data from the Mixpanel API it will fork a new Mixpanel Transformer with the S3 file path as a parameter.

taks_params = {
  env: ENV['WORKER_ENV'],
  s3_path: 's3://bucket-name/raw-export-file.txt'
}
client = IronWorker.new
client.tasks.create('Mixpanel Transformer', taks_params)

Similarly, the Mixpanel Transformer will kick off the Redshift Load task when it has completed aggregating and transforming the data.

Extracting the Data

We build a custom worker for each external data source. This process relies heavily on 3rd party API documentation. Big props to Desk.com for their great API docs.

Transforming the Data

Each pipeline gets its own set of transformers. A transformer is responsible for sanitizing data and returning a CSV row in the ordinal column position of the Redshift target table (these rows will be used for the DB copy).

# lib/transformers/mixpanel/app_launched_event.rb
module Mixpanel
  class AppLaunchEvent
    def initialize(data)
      @data = data
    end

    def csv_row
      CSV.generate_line(ordinal_transformed_data)
    end

    private

    def ordinal_transformed_data
      # transformed data in ordinal column position
      [
        data['customer_id'],
        device_id,
        device_platform,
        data['timestamp'],
        # ...
      ]
    end

    def device_id
      # Transformation of device identifier
    end

    def device_platform
      # Transformation of device platform
    end

    attr_reader :data
  end
end

The Transformer is responsible to creating a CSV file on S3 with the import data. It then kicks off a Redshift Load worker with the S3 path of the import file.

Loading the Data

The final step of the pipeline involves copying the transformed data into Redshift. This is achieved using the Postgres COPY command.

class RedshiftLoad
  def initialize(attrs)
    @s3_path = attrs.fetch('s3_path')
    @table_name = attrs.fetch('table_name')
  end

  def copy_data
    DB.exec <<-EOS
      COPY #{table_name}
      FROM 's3://bucket-name/#{s3_path}'
      CREDENTIALS 'aws_access_key_id=#{access_key};aws_secret_access_key=#{secret_access_key}'
      MANIFEST
      CSV
      EMPTYASNULL
      GZIP
    EOS
  end

  private

  def access_key_id
    ENV['aws_access_key_id'] or raise 'AWS Access Key ID not set'
  end

  def secret_access_key
    ENV['aws_secret_access_key'] or raise 'AWS Secret Access Key not set'
  end

  attr_reader :s3_path, :table_name
end

The keen observer will notice we’re using the MANIFEST flag during the COPY operation. This is a Redshift specific feature that allows us to copy multiple CSV files in parallel (this dramatically speeds up imports when dealing with millions or rows of data).

class RedshiftManifest
  def initialize(attrs)
    @bucket_name = attrs.fetch('bucket_name')
    @manifest_file = attrs.fetch('manifest_file')
    @s3_paths = attrs.fetch('s3_paths')
  end

  def write_csv_entries
    manifest_file.write JSON.generate(manifest)
    manifest_file.rewind
  end

  private

  def manifest
    { entries: manifest_filename_list }
  end

  def manifest_filename_list
    s3_paths.map do |csv_path|
      { url: 's3://' + bucket_name + '/' + csv_path, mandatory: true }
    end
  end

  attr_reader :bucket_name, :manifest_file, :s3_paths
end

The compiled manifest file looks like this:

{
  "entries": {
    { "url": "s3://bucket-name/data-01.csv", "mandatory": true },
    { "url": "s3://bucket-name/data-02.csv", "mandatory": true },
    # ...
  }
}

Working with Unified Data

Previously if we wanted to analyze data across multiple 3rd party providers we needed to export CSV’s from each provider, create pivot tables, and apply lots of Excel wizardry.

Now, with all our data in Redshift we can run SQL queries and JOIN data from multiple platforms into one result set.

Written by Harlow Ward

Read more posts by Harlow, and follow Harlow on Twitter.

Interested in building something great?

Join us in building the worlds most loved hotel app.
View our open engineering positions.