Harness Acquires CI Pioneer Drone.io. Read More

What is a serverless architecture?

Serverless architecture refers to an infra setup where applications depend either on third-party services (BaaS) or on third party stateless compute containers (FaaS) to handle server-side logic. (Refer for a more comprehensive guide to server less architecture)

Our use case

We at harness use dataflow pipelines to run some of the preprocessing steps for our machine learning models. We needed to run some beam pipelines periodically on the production data.

Why serverless?

Since beam pipelines are standalone, FaaS seemed to be the right architecture. It was a simple application as all the complex processing was done in the beam pipeline, all the cloud function had to do was trigger it. Additionally, these pipelines need to be triggered once a week or once a month. Having a whole server just to trigger a pipeline seemed to be a waste of resources. With cloud functions, you pay the cost per triggers.

In this post, I am going to layout the architecture that worked for us which takes just 4 steps. We majorly use python here, so this blog post will contain python specific code. Also, this writeup assumes basic knowledge on beam pipelines and it’s blocks. If you are not familiar with it, you can follow this post.

Step 1: Write a beam pipeline

Below is the code for a simple beam pipeline that searches through a list and returns a list of words that contain the given substring.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class getWordsContaining(beam.DoFn):
    def process(self, element):
        if substring in element:
            yield element
options = PipelineOptions()
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
cities = p | beam.Create(['hello', 'world', 'ok', 'hi'])
cities | beam.ParDo(getWordsContaining())
result = p.run()

Now, suppose we want to parameterize our beam pipeline. We can run that python code with an input parameter: “substring”.

Step 2: Create Custom Template

Custom templates can be used if we need to run the same pipeline frequently with the same or different parameters without any other code change.

In this case, we are assuming that we need to use a different substring value every time. To do that, the first step we need to do is add substring as a run-time parameter.

class UserOptions(PipelineOptions):
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--substring', type=string)

“add_value_provider_argument” assumes a runtime parameter substring exists.

To use the added run-time parameter in the DoFn block of pipeline, you need to add an init method that takes a new parameter substring. To access that substring, use substring.get().

class getWordsContaining(beam.DoFn):
def __init__(self, substring):
        self.substring = substring
def process(self, element):
        if self.substring.get() in element:
            yield element

Note that, since this is a run-time parameter, the value will not be accessible in init function. If you use .get() in the init function, it will throw an exception during compilation.

Now, we need to update the pipeline block where this function is being called.

user_options = pipeline_options.view_as(UserOptions)
cities | beam.ParDo(getWordsContaining(user_options.substring))

Save Custom Template

Now, your custom template is ready to be saved. To save your custom template in cloud storage, update PROJECT and BUCKET with your actual project and bucket names. Also, you can add your own TEMPLATE_NAME.

python GenerateTSThresholds.py --runner DataflowRunner --project
PROJECT --staging_location gs://BUCKET/staging --temp_location
gs://BUCKET/temp --template_location

Once it’s run, you can check at the “template_location” if it is saved or not.

Create metadata File

For the template to work, you need to create a metadata file for your pipeline specifying the parameters it requires. Make sure to save it with suffix “_metadata” to your template name.

  "description": "Test pipeline that returns list of words containing substring",
  "name": "Substring test",
  "parameters": [
      "regexes": [
      "name": "substring",
      "helpText": "Only alphabetic characters are allowed",
      "label": "Input substring to match"

Copy the metadata file to same GCS bucket:

gsutil cp METADATA_FILE gs://BUCKET/templates/

You can test the custom template using the directions given here.

Step 3: Create Cloud Function

Once, you’ve tested the custom template and it’s working correctly, you can move on to create the cloud function. Since we are heavily dependent on python, we have used the same to create the cloud function.

You can follow this for the basic setup:

In the cloud function, we need to create an API that launches the pipeline. Use the below code in main.py.

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def launch_pipeline(request):
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials)
    # Update the below four variables to reflect your requirements
    JOBNAME = "jobname"
    PROJECT = "project"
    BUCKET = "bucket"
    TEMPLATE = "template"
    GCSPATH = "gs://{bucket}/templates/{template}".format(bucket=BUCKET, template=TEMPLATE)
    BODY = {
        "jobName": "{jobname}".format(jobname=JOBNAME),
        "parameters": {
            "substring": "o"
        "environment": {
            "tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
            "zone": "us-central1-f"
    request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
    response = request.execute()

This gets the credentials of the current project and launches the template. Update the job, project, bucket and template name variables. You can pass the substring variable in the parameters section.

Add your requirements in requirements.txt file. For this, only apache-beam is required to launch the pipeline.


Update the “function to execute” field and set it to “launch_pipeline”.

Once you create the pipeline, it’ll take about a minute or two to be ready. You can find the API under the trigger section and test it out.

Step 4: Create Scheduling Job

This the easiest step by far. You can use cloud scheduler to periodically schedule your pipeline. Fill out the url that you created from the cloud function and set the frequency.

Voila! Your pipelines are now up and running. Let me know if this was helpful and feel free to suggest any improvements. Cheers!

Sowmya K

Keep Reading