How To Build A Simple Data Pipeline on Google Cloud Platform

Here’s a demonstration of how to build a simple data pipeline using Google Cloud Platform services such as Google Cloud Storage (GCS), BigQuery, Google Cloud Function (GCF), and Google Cloud Composer.

April 7, 2021

Google Cloud Platform is a suite of cloud computing services that brings together computing, data storage, data analytics and machine learning capabilities to enable organizations to build data pipelines, secure data workloads, and carry out analytics. In this article, we will conduct a simple demonstration of how to build a simple data pipeline using services from the Google Cloud Platform (GCP).

For this particular use case, let’s pretend to be an imaginary company named LRC, Inc. that sells various products.

Requirement

Our sales department wants to provide a CSV file of recent sales. The file will come from our sales management system and the data will be used for tracking sales and providing aggregated data to an upstream system. The file will contain a customer name, a product id, an item count, and an item cost.

The data should be stored over time and new data appended. As part of the process, a new file should be generated that will contain all order data aggregated by customer and product. The new file will be stored in a shared location so that another team can access it anytime.

When the file is delivered, an automated email should be sent to a specific address ([email protected]) that will open a ticket alerting the other team that a file is available.

Design

Our pipeline is fairly simple. We have several steps:

  1.     Watch for a file
  2.     Load a file into a database
  3.     Create an aggregation from the data
  4.     Create a new file
  5.     Send an email

Our imaginary company is a GCP user, so we will be using GCP services for this pipeline. Even with restricting ourselves to GCP, there are still many ways to implement these requirements. As part of our technical requirements, we want to keep this as serverless as possible and write as little code as possible.

We will use Google Cloud Storage (GCS) to store the files. Our database will be Google’s big data distributed SQL database, BigQuery, and we will use Google’s managed Airflow, Google Cloud Composer. We will also use a Google Cloud Function (GCF) to watch for a file arrival. We could also use GCF to send the email at the final step, but we will put that in our composer orchestration to keep things as contained as possible.

Simple GCP Pipeline

This article does not contain a tutorial for any of these services but will explain how the pieces fit together to create a pipeline. Describing each service in detail would require many articles. Google provides excellent tutorials, and I will reference some of those as we go through the process.

Learn More: How To Build Data Pipelines for a Multi-Cloud Environment

Implementation

GCP provides a very nice-looking UI for most commands and services. It follows Google’s minimalistic design and is pretty clean. However, to create a repeatable and automated deployment process, we should use the command line for most tasks related to pipeline development. The UI is great for exploration and analysis. Scripting is the rule for things we will need to automate. Our pipeline would first be created in a development environment, deployed later to a test environment, and finally to a production environment. Because of that, I will primarily use command line interface (CLI).

Google Cloud Storage

The first step is to create a couple of buckets in GCS. We will follow the LRC, Inc bucket naming standards. For this, we will use our company’s name, followed by the environment, followed by a decent name.

Note: GCS bucket names are globally unique. If you follow along, you cannot name your buckets the same as what I name them. You will need to choose your bucket names and remember them later in the process.

For our incoming file, we will name it lrc-dev-incoming-sales-data. I will refer to this later as “the incoming bucket”.

gsutil mb -l us-east1 gs://lrc-dev-incoming-sales-data/

For our outgoing file, we will name it lrc-dev-outgoing-sales-aggregate. I will refer to this later as “the outgoing bucket”.

gsutil mb -l us-east1 gs://lrc-dev-outgoing-sales-aggregate/

We can list those to be sure they were created:

gsutil ls gs://lrc-dev-outgoing-sales-aggregate/
gsutil ls gs://lrc-dev-incoming-sales-data/

Find more information on gsutil and buckets at the GSUtil GCP PageOpens a new window .

BigQuery

We need a table in BigQuery to hold our data and will use the BQ CLI command to create this. BQ allows several ways to create a table. If you are familiar with JSON, you can use a JSON doc to describe the columns and data types. If you are from a database background, you will probably want to use SQL DDL to create the table. I will use DDL.

You could also defer creating the table and allow creation during the step in Composer that we will use to load the data. That command will create the table if it does not exist. However, I would rather create it as part of the deployment, so we will use BQ to do so. For the final aggregate table, I will let Composer create it to show how it works.

The BQ command is:

bq query –nouse_legacy_sql ” \
CREATE TABLE sales_data.current_sales (
          customer_name string,
          product_id string,
          item_count int64,
          item_cost float64
)”

You can read more about BQ and creating tables at the GCP BQ pageOpens a new window .

Learn More: Close to the Edge: Automated Data Pipelines for Real-time Processing

Google Cloud Function (GCF)

For the GCF part of this, the function that will watch for a file arrival, we will use python and we will actually use an example provided by Google.

Navigate to the Triggering with GCF pageOpens a new window in the Cloud Composer documentation. If you follow that documentation, you will end up with a cloud function that calls a DAG when a file in GCS is finalized. If you follow the directions and implement it exactly as written (with a small minor deviation described below), you will end with the first step in our simple pipeline.

The one small deviation is that you will need to use the incoming bucket that you created below for the trigger Bucket. You cannot use the one I use or the one in the documentation as they are globally unique. 

Once finished with this step, you will have a DAG called trigger_response_dag.py with a DAG id of composer_sample_trigger_response_dag. We will use that DAG in the rest of the steps.

Composer (Airflow)

For the final step, we will create the DAG for the composer. We have four steps remaining: upload the CSV file from GCS to BQ, create an aggregation, export the aggregated data from BQ to GCS, and finally, send an email.

Edit the DAG from the GCF step. Remove the one task (print_gcs_info) in the DAG. We will replace that with our own steps. We will need to add some imports to the existing imports. These imports will allow us to call the required operators.

from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.email_operator import EmailOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

Below the imports and just above the default_args declaration, add these 4 variables:

today = date.today().strftime(“%Y%m%d”)
outgoing_bucket = “lrc-dev-outgoing-sales-aggregate”
sales_table = “sales_data.current_sales”
aggregate_table = “sales_data.total_sales”

These are variables that we will use in the steps below. They should be fairly self-explanatory, and you will see how they are used shortly.

Now, inside the “with airflow.DAG()” section (the body of the DAG), replace the bash task with the following steps.

First, we want to read the file into BQ from GCS. We use the GoogleCloudStorageToBigQueryOperator operator for this:

# Load Data from GCS to BQ
GCStoBQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
          task_id=’GCStoBQ’,
          bucket=”{{ dag_run.conf[‘bucket’] }}” ,
          source_objects=[ “{{ dag_run.conf[‘name’] }}” ],
          destination_project_dataset_table= sales_table,
          schema_fields=[{“name”: “customer_name”, “type”: “STRING”, “mode”: “REQUIRED”},
                                    {“name”: “product_id”, “type”: “STRING”, “mode”: “REQUIRED”},
                                    {“name”: “item_count”, “type”: “INTEGER”, “mode”: “REQUIRED”},
                                    {“name”: “item_cost”, “type”: “FLOAT”, “mode”: “REQUIRED”}
                                   ],
          source_format=’CSV’,
          #GZIP or NONE
          compression=’NONE’,
          create_disposition=’CREATE_IF_NEEDED’,
          skip_leading_rows=1,
          # WRITE_DISPOSITION_UNSPECIFIED, WRITE_EMPTY, WRITE_TRUNCATE, WRITE_APPEND
          write_disposition=’WRITE_APPEND’,
          field_delimiter=’,’,
          max_bad_records=0,
          quote_character=None,
          ignore_unknown_values=False,
          allow_quoted_newlines=False,
          allow_jagged_rows=False,
          encoding=’UTF-8′,
          max_id_key=None,
          autodetect=False
          )

The schema_fields parameter defines our table and the CSV file should match that format.

Notice the bucket=”{{ dag_run.conf[‘bucket’] }}”, andsource_objects=[ “{{
dag_run.conf[‘name’] }}” ], parameters. The GCF function sends the bucket and file name that were uploaded to the incoming bucket to the DAG as a dag_run.conf{} input parameter. These two parameters in the operator allow Jinja templates. The syntax we use extracts the bucket name and file name.

We could hard code the bucket name since we know what the incoming bucket is. It is a better practice to get the bucket name from the function though, as we may call this dag from multiple buckets in the future, and we will likely have different bucket names for dev, test, and prod. The other variables above could be created as Airflow variables, and that is what I would recommend. That is beyond the scope of this article, though.

Once this task is completed, the data will be in BigQuery. The next task will be of the BigQueryOperator to aggregate the data into a new table for export. This table will be truncated if it already exists.

# Create aggregate table
Transform = BigQueryOperator(
          task_id=’Transform’,
          sql=(“SELECT customer_name, product_id, sum(item_count) prd_total_count,”
                         ” sum(item_cost*item_count) prd_total_cost”
                         f” FROM {sales_table}”
                         ” GROUP BY customer_name, product_id;” ),
          destination_dataset_table=aggregate_table,
          use_legacy_sql=False,
          write_disposition=’WRITE_TRUNCATE’,
          create_disposition=’CREATE_IF_NEEDED’,
          dag=dag
)

The query above sums the item_count and sums the item_count * the item_cost to provide aggregates and is grouped by the customer name and product id.

The third task is to write our new table out to GCS so that the other team can pick it up. We are going to call our file, sales_data_aggregate_ with today’s date appended to it. We will use the today variable we declared above for the date.

BQtoGCS = BigQueryToCloudStorageOperator(
          task_id=’BQtoGCS’,
          source_project_dataset_table=aggregate_table,
          destination_cloud_storage_uris=[f”gs://{outgoing_bucket}/sales_data_agg_{today}*.csv”],
          provide_context=True,
          export_format=’CSV’
     )

The last task is to send an email to the other team letting them know they can come pick up the file. We’ll send an email to “[email protected]”.

SendEmail = EmailOperator(
          task_id=’SendEmail’,
          to=[“[email protected]”],
          subject=f”Data Aggregate available for date {today}”,
          html_content=”<html><body><p><h2>Run completed successfully.</h2></p>”
                         f”<p>File Located at: gs://{outgoing_bucket}/sales_data_agg_{today}*.csv</p>”
                         “</body></html>”,
          dag=dag,
)

At the bottom of the DAG, add the execution line:

GCStoBQ >> Transform >> BQtoGCS >> SendEmail

We now have a completed pipeline. The final DAG, in the airflow console, should look like this:

Data Pipeline

For more Cloud Composer documentation and tutorials, check out the Google Cloud Composer DocumentationOpens a new window .

Here is an example of a CSV to upload to the incoming bucket.

customer_name,product_id,item_count,item_cost
ABC,PRD123,5,1.25
DEF,PRD456,25,0.07
GHI,PRD765,13,9.99

If you upload it several times, you will be able to see how the aggregate values increase over time. 

Bottom Line

That’s all it takes to create a fairly simple but fully-featured data pipeline. We ingested data, transformed it, extracted the new data, and sent a notification. Note that there is no data validation or error handling and that is something you would want to incorporate in a production system. On GCP, logging is pretty much automatic.

Data pipelines can become very complex. While this is a simple data pipeline, it is indicative of the major steps most batch pipelines include. 

Let us know your thoughts in the comment section below or on LinkedInOpens a new window , TwitterOpens a new window , or FacebookOpens a new window . We would love to hear from you!

Lewis Cunningham
Lewis Cunningham is a Certified Cloud Practitioner, an Oracle Ace Director Alumnus, an Oracle Certified Professional, a CIW Certified Database Designer, a published author, a frequent conference speaker, and a Database Architect.Lewis's specialties revolve around the cloud (particularly AWS), programming, databases, data warehousing, business intelligence and most anything having to do with data. Lewis has over three decades of architecture, database, and software development experience in a variety of industries and capacities. He has programmed in many languages and currently likes to use PL/SQL, Java, Groovy and Python. On the database side, he has expertise with Oracle, PostgreSQL, MySQL, and Redshift. Lewis also works with various Cloud Computing platforms.
Take me to Community
Do you still have questions? Head over to the Spiceworks Community to find answers.