Combining data pipelines and enrichment services in different languages

How ZipRecruiter integrates Python utilities into the stream of Scala data processing pipelines

ZipRecruiter’s mission is to actively help job seekers find their next great career opportunity. But to do so, we need to pull off some pretty heavy lifting behind the scenes. Enormous amounts of unstructured data–including very ‘creative’ job listings and resumes, in all shapes and sizes–flows through our data pipelines. These pipelines use Delta Lake built with Spark processes written in Scala.

As you can imagine, we have multiple services, from small transformation functions to intricate ML models, all of which we would like to use to enrich the streams of data in our live processing pipelines. Most of these enrichers, however, are written in Python while our data pipelines operate in another family of languages altogether. We needed to find a solution for calling external enrichers from inside Spark. 

In this article I’ll go over some common (yet ill-advised) approaches, and share the more elegant solution we developed. We will see why (and how) ZipRecruiter built an interoperable system that allows Python to act as a library and a UDF in the Scala dataframe, while empowering Spark to manage compute and scale.

The Challenge: Converting strings to complex location formats mid-stream 

As our data pipeline speeds along, we ingest millions of records. Many of them contain data in string format which needs to be transformed into a more useful data type for downstream use. We have a service, for example, called LocRes for ‘Location Resolver’, that receives addresses in string format and returns them in a standardized and structured location format. LocRes is not just a simple lookup; It loads an in-memory datastore and applies non-trivial logic utilizing various weights and calculations before returning results.

Since this service is written in Python and our data pipelines utilize Spark processes written in Scala – we cannot simply run LocRes as a library. 

Calling API services from inside a Spark context is a bad practice

One way to address this would be to have a Spark process which shuffles and divides the data itself, and as part of the enrichment process call the Python job over HTTP/gRPC. Calling an API from inside Spark jobs, however, has many downsides and is considered by many to be bad practice.

The main reason we avoid this is because doing so reduces Spark’s inherent capability to perform.

You want your system to scale with the job and the best way to do that is have your compute managed by Spark rather than calling an external API that has to be repeatedly and manually scaled to receive batch traffic. 

Instead of allowing Spark to do what it’s good at, i.e. work at scale while dividing the executors and data in the most efficient way it sees fit, calling an API from within the Spark context limits the process to an external service that Spark is waiting for. If that external service is slow or delayed for whatever reason, it delays Spark and we lose all the benefits and abilities inherent to Spark.

In addition, having Spark workers blocked and waiting on external calls is a wasteful use of resources. For any company, but especially at the scale of data dealt with at ZipRecruiter, this quickly translates into a lot of wasted money.

Calling external services as sidecars still limits scale

Another approach to the challenge is to use the Python library as a sidecar. Simply put, calling the Python service over HTTP to be deployed with the app in every Spark worker. In this way, one would bypass the external nature of the service and make it available locally.

Indeed, Spark would be able to “flex its muscles” and perform slightly better, but you’re still on a certain network and even though it’s local – scale is still limited. What’s more, having the service as a sidecar adds memory and resources overhead on the process pods.

A series of co-independent processes exposes too many fail points

Alternatively, one may consider calling the enrichment services serially, cutting the pipeline into little pieces and stitching them together one after the other. In Kafka, each worker could call the service, write the output somewhere, and another Spark process could grab that and continue.

The problem with this approach is that it creates too many moving parts which would be struggling to advance in concert. While each process seems to run independently of a single driver or executor, the increased complexity multiplies the potential fail points and drags down overall performance.

The solution: Calling external services as UDFs in a PySpark session

Since both Scala and Python can operate in Spark running on JVM (Java Virtual Machine), we can share resources between them. 

The optimal solution we’ve come up with is to initiate a service in a PySpark session which in turn passes an active Spark session into the Scala app. In Scala we continue all our transformations, including reading inputs and writing outputs, with ZipRecruiter DeltaLake common code. There, we can enrich the data using a UDF (user defined function) written in Python within the Spark context. 

The result is the ability to work with two languages in the same session in parallel. It eliminates the need for over-HTTP network calling and everything this implies. We are simply calling a logical Python operation just like any other function. Scala common code handles all the DeltaLake reads and writes, and the Python code is not reliant on anything external. 

6 Steps to execute the solution

The general steps are as follows:

  1. Creating a PySpark session.
  2. Creating a Python UDF and registering the UDF on Spark context.
  3. Calling Scala code from PySpark.
  4. Scala code handling its logic (Read, Process , transform and write).
  5. Scala code calling the PySpark UDF via expr().
  6. PySpark closing Spark session.

AppBase and Dockerfile

Make sure that both Scala and Python are installed on the main container and worker.

Run gradle to create the Scala app uber jar:

RUN gradle clean build --no-daemon --stacktrace --init-script /app/src/common/gradle/init-repos.gradle

Make sure that both Python scripts and the Scala uber jar are copied into the relevant workdir (e.g. /app/bin)

PySpark session creation

The Spark session is created by PySpark with all relevant overrides and relevant config definitions per tier (memory, cpu, paths,…).

It is important for a Delta app to define for SparkConfigurator the enable_delta_support=True to allow different common delta settings. This can be overridden if needed.

PySpark UDF

Create an Iterator of Series to Iterator of Series UDF panda_udf (link).

The method for initializing the LocRes object (and any needed resource for general use) happens for each batch and not once per executor or per python process (multiple cores). We suggest using the Borg pattern described here.

Run the initialization code at the beginning of the iteration of the extract_location_service_info_udf code and then iterate over the batch applying your service (in our case LocRes) on each element.

The important part is to register the UDF on SparkContext:

spark.udf.register("extract_location_service_info_udf", extract_location_service_info_udf)

Passing session to scala

Pyspark will call the Scala app with the Spark context it created:

success = sc._jvm.com.ziprecruiter.company.store.fact_companies.FactCompanies.processData()

The Scala object that is being called has a lazy evaluated SparkSession member that will be initialized with the PySpark session automatically.

private lazy val spark = SparkSession.active

Scala usage of Python UDF

In order to call the UDF we need to use expr(). Otherwise, the Scala code won’t compile.

hq_unresolved_str is the column name we pass from the DataFrame. Change this in the code to suit your project’s needs. 

val enrichedDistinctLocations = distinctLocationsStr
      // Using expr to evaluate usage of registered panda_udf on spark Context
      .withColumn("locres_response",expr("extract_location_service_info_udf(hq_unresolved_str)"))

Expanding the application to other use cases

We can now go beyond this use case to achieve remarkable flexibility. Many enrichers that we have at ZipRecruiter were not built to be called into Spark over HTTP and were sidelined until now. Most of our ML code is also written in Python, given the maturity of that ecosystem. This method of utilizing external enrichers in-process enables us to share them with the Scala framework and even allows the models themselves to call enrichers and share data. 

For example, every job listing record has a field containing the job title, like ‘Senior Logistics Manager’. A Python model converts this job title to fit our internal taxonomy of standardized job titles. We can now expose this model to any process in Scala.

ZipRecruiter has built its stack such that the ‘data world’ is in Scala and Delta Lake, while the ‘service world’ contains models and functions written in Python or similar. The solution outlined above provides a bridge between these two worlds, within active data pipelines. While other companies may have constructed their stack completely in Python, or perhaps decided not to connect Spark and models at all, we believe our solution enables us to take advantage of the best of all worlds. 

If you’re interested in working on solutions like these, visit our Careers page to see open roles.

* * *

About the Author

Roi Yarden is a Senior Software Engineer at ZipRecruiter on the Company Store team. In his role, he creates tools that collect, decipher, and organize data from millions of companies, all aimed at assisting job seekers in finding employers that align with their needs. Over the past 6 years at ZipRecruiter, Roi has had the opportunity to support dataset building used to construct ML matching models and delve into job seeker engagement data. As a veteran of the IDF’s elite central computing unit, Mamram, and holder of a Dan 4 Karate black belt, Roi is more than just your typical engineer. 

More Articles by Engineering Team at ZipRecruiter