Cloud Dataflow + Cloud Tasks = A Ravenous Beast

Akshay Apte
6 min readFeb 17, 2021

At trivago, we often deal with millions of accommodations and their images. To give you an estimate, trivago’s hotel search allows users to compare hotel prices in just a few clicks from more than 300 booking sites for more than 5 million hotels and other types of accommodations in over 190 countries. The quantity of images for these accommodations amount to over hundred million in total. As one of the teams in the hotel search domain, we are tasked with “tagging” the entire image inventory on Google Cloud Platform.

Accommodation image tags are labels that are assigned for a given image denoting the type of room, surroundings, contents of a room, or other attributes. These tags are useful in sorting and calculating an attractive image gallery for an accommodation which leads to more potential click-outs. 💰

Examples of tags with confidence scores for Red Roof Inn

We utilise an external API which accepts the image URL as input and returns image tags and their confidence scores. Since the API is compute heavy, it has a few caveats:

  1. It can process a batch of image URLs with max batch size as 16.
  2. Max concurrent API calls are limited to 50

These limitations mean that we couldn’t simply throw traffic at the API. We needed an intermediate store to facilitate uninterrupted processing.

Here is an excerpt from one of our technical discussions

Engineer 1: “How do we scale our pipeline to tag all these images without interruption? We need a queue of sorts.”
Engineer 2: “Let’s dump the image URLs in a Pub/Sub topic and have a cloud function pull them.”
Engineer 1: “Meh, We’d have to implement a retry logic and handle API throttling in the function.
Have you heard about Cloud Tasks? It’s a fully managed, highly scalable queue which comes with built-in rate limiting, retry policies, de-duplication, task scheduling, all in a distributed fashion!”
Engineer 2: “Interesting. How do we ingest all these images into the queue?”
Engineer 1: “It’s simple, We kill the batman. I mean, we use Dataflow.”
Engineer 2: “Geez, What a nerd.”

Well, We ended up using Cloud Tasks and here’s how the pipeline looked

We will. We will. Rock Queue

To pump all image URLs into the tasks queue, we needed a mechanism which could read cloud storage files, batch records and create a task for each batch. We initially thought of writing an application to read each file and create tasks. That didn’t fly because the app couldn’t read files in a distributed manner. Pushing 100 million+ URLs would have taken forever.

Cloud Dataflow is a fully managed, serverless and distributed big data processing service. The first job in the diagram reads loads of files from a bucket, creates batches of desired size and pushes it to Tasks queue. Have a look at the DAG below.

The pipeline reads the already tagged image IDs and only tags images that are untagged. If there are issues while creating cloud tasks, the records are pushed to a dead letter queue to process later.

A noteworthy thing is that the Apache Beam SDK does not have a sink for cloud tasks yet. We called the Cloud Tasks API from a ParDo function inside the job. This got tricky as the client was initialised for every task.To avoid that, use start_bundle and finish_bundle methods which ensure that the tasks client is initialised once per worker like so:

class PushAccommodationImagesFn(beam.DoFn)    def __init__(self):
super().__init__(self)
self.element_batch = []
self.tasks = None
def start_bundle(self):
self.tasks = tasks_v2.CloudTasksClient()
def process(self):
if elem is not None and isinstance(elem, dict):
self.element_batch.append(elem)
if len(self.element_batch) >= self.BATCH_SIZE:
self.tasks.create(self.element_batch)
def finish_bundle(self):
if len(self.element_batch) > 0:
self.tasks.create(self.element_batch)

You can define and create a HTTP task in the queue by using the Python SDK for tasks. The url should be valid and must return a successful response. Quickstart here

task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.POST,
"url": url, # The full url path that the task will be sent to.
}
}
client.create_task(request={"task": task})

We were quite impressed with the way Dataflow performed. It took only 45 minutes with 50 workers to push ~90 million images (after removing invalid images) to the tasks queue. The dead letter queue was empty.

Processing in the queue

After the first job, the queue had ~5.6 million tasks (each task containing a batch of 16 image URLs) ready to be dispatched.

Well, hello there!

The tasks are configured to call a cloud function (you could also use Cloud Run/K8s) which in turn calls the tagging API and pushes the tags to a Pub/Sub Topic. A streaming Dataflow job can read the topic and write tagged images back to a storage bucket.

If the cloud function returns a successful response (status code 200), the task is deleted from the queue. Make sure you handle exceptions in the cloud function. If the URL returns anything other than a successful response, the queue schedules a retry defined by the retry policy.

{
"maxAttempts": integer,
"maxRetryDuration": string,
"minBackoff": string,
"maxBackoff": string,
"maxDoublings": integer
}

The task will be retried until successful for maxAttempts times. Learn more about the other parameters here.

There might be an edge case where the image URL is invalid or malformed.
So what happens if the task still fails even after retrying maxAttempts times? It simply gets lost from the queue.
To avoid this, we leveraged a request header that tasks add themselves: x-cloudtasks-taskretrycount

This header contains the number of times the task is already retried. In the cloud function, if the header value is maxAttempts — 1 we publish the message to a dead letter queue. This is also one of the reasons why we didn’t call the tagging API directly from Cloud Tasks.

Cloud tasks also provide a rich dashboard where you can track the current queue depth and retries for each task.

We limited the max concurrent executions of the cloud functions to 100 to not burden the tagging API. The max dispatch rate of the queue was also limited to 50. This is done to not saturate the cloud functions API, otherwise the. function won’t be triggered and no messages will be pushed to dead letter queue. With these configurations, we were able to tag the entire inventory in a span of 6 days. Pretty good right?

Summary

Cloud Dataflow in conjunction with Cloud Tasks lets us enrich or transform huge amounts of data in a completely serverless manner. It is also blazingly fast as it processes data in a distributed fashion. Tasks also handle the overhead of retrying failures so that you focus on more important parts of the pipeline.

Thanks for reading. Feel free to reach out to me through the comment section or through LinkedIn. I am open to any discussion and constructive criticism is always welcomed. 😁

--

--

Akshay Apte

Professional Software Developer, Amateur Parallel parker