Copy MySQL database tables to Redshift with AWS Data Pipeline

By Harlow Ward

We’ve been successfully using AWS Redshift as our data warehousing solution for over a year now, and recently posted about how we created a Ruby based ETL Pipeline to pull in all our third party data.

However, there is an important component of our pipeline that was not covered in the post: How we leverage AWS Data Pipeline to reliably transfer data from MySQL to Redshift.

Data Pipeline Control Panel

The AWS Control Panel has a GUI for creating and managing pipelines in the browser. It’s worth taking a look to get feel for how the service works.

However, its usefulness begins to break down as a pipeline definition grows in complexity.

mysql to redshift pipeline

Luckily the AWS CLI allows us to manage all our Data Pipelines from the terminal. We use it for creating, uploading, and activating our pipelines.

Pipeline Objects

An important part of working with AWS Data Pipeline is getting familiar with the Pipeline Objects. The objects are represented by JSON payloads, and they can be combined to create data flow between multiple services.

For example the CopyActivity references an input, output, and a schedule. In this case the InputData would be a MySQL data node, the OutputData an S3 data node, and the CopyPeriod would define how often the activity runs.

{
  "id" : "MySQLToS3",
  "type" : "CopyActivity",
  "schedule" : { "ref" : "CopyPeriod" },
  "input" : { "ref" : "InputData" },
  "output" : { "ref" : "OutputData" }
}

Ruby Pipeline Objects

We created a library of Ruby objects to abstract away the structure of the JSON. These have helped dramatically reduce the complexity of our pipelines and allowed us to define them in a more programmatic fashion.

module DataPipelineComponents
  class CopyActivity
    pattr_initialize [:alert!, :input!, :output!, :runs_on!, :schedule!]

    def to_h
      {
        id: id,
        type: "CopyActivity",
        schedule: {
          ref: schedule
        },
        runsOn: {
          ref: runs_on
        },
        input: {
          ref: input
        },
        output: {
          ref: output
        },
        onFail: {
          ref: alert
        }
      }
    end

    def to_s
      id
    end

    private

    def id
      "CopyActivity_#{input}"
    end
  end
end

Pipeline Generation

Several of the Ruby components are then combined to build the end-to-end pipeline. The composition of the components will be used to generate the final JSON payload.

require "attr_extras"
require "json"

class SqlToRedshiftPipeline
  pattr_initialize :config, :table_names

  def to_h
    {
      objects: [
        default.to_h,
        alert.to_h,
        csv_format.to_h,
        ec2_resource.to_h,
        redshift_database.to_h,
        schedule.to_h,
        *table_copy_activities,
      ]
    }
  end

  def to_json
    JSON.pretty_generate(to_h)
  end

  private

  def default
    DataPipelineCompontents::Default(config)
  end

  def alert
    DataPipelineComponents::SnsAlarm.new(config)
  end

  def csv_format
    DataPipelineComponents::CsvFormat.new
  end

  def ec2_resource
    DataPipelineComponents::Ec2Resource.new(config, schedule)
  end

  def schedule
    DataPipelineComponents::Schedule.new(config)
  end

  def redshift_database
    DataPipelineComponents::RedshiftDatabase.new(config)
  end

  def table_copy_activities
    table_names.flat_map do |table_name|
      TableCopyActivity.new(
        alert: alert,
        config: config,
        format: csv_format,
        redshift_database: redshift_database,
        ec2_resource: ec2_resource,
        schedule: schedule,
        table_name: table_name,
      ).to_a
    end
  end
end

A rake task is then used to generate the final JSON payload.

namespace :generate do
  desc "Generate MySQL to Redshift definition"
  task :mysql_to_redshift_pipeline do
    config = Hashie::Mash.new(YAML.load_file("mysql_to_redshift.yml"))
    connection_string = ENV.fetch('DB')
    db = Sequel.connect(connection_string)

    File.open("pipelines/mysql_to_redshift.json", "w") do |f|
      f.write SqlToRedshiftPipeline.new(config, db.tables).to_json
    end
  end
end

Flock to Unlock

If the Ruby components feel like a useful open-source contribution retweet this article or send a message to @HotelTonightDev and let us know! If there is enough interest we’d be happy extract them into a Gem.

Written by Harlow Ward

Read more posts by Harlow, and follow Harlow on Twitter.

Interested in building something great?

Join us in building the worlds most loved hotel app.
View our open engineering positions.