Accurately measuring lag in a Spark streaming data pipeline to DeltaLake

How to properly use the Spark StreamingQueryListener to capture component lag with Prometheus

At ZipRecruiter, we match millions of job seekers with relevant job recommendations every day, while providing data-rich recruitment services to thousands of businesses. Inherent to a seamless online user experience is the speed with which screens load and services are rendered. 

When job seekers fill out their details like location, education, or employment history, they see it in their profile almost immediately. Behind the scenes, however, this data must be made available for internal use so that we can match candidates to jobs using powerful machine learning models, offer intent driven search tools, and more. 

As we continue to scale, so does the pressure on our data processing pipelines. To ensure a high standard of service, we continuously monitor the freshness of our data at every stage in our pipeline.

The following article details how we create precise, realistic, and actionable metrics for monitoring our data processing lag. Reaching this point required changing our definition of lag multiple times and debunking several misconceptions. 

The Spark StreamingQueryListener and Prometheus documentation are a bit coy about both their limitations and their strengths. Resist the temptation to build your own custom app to solve this problem, the existing tools can do it. Here we go!

Defining Lag: The age of the oldest unprocessed record

At the onset, it is important to distinguish between ‘forward looking lag’ and ‘backward looking lag’. 

  • Forward looking lag refers to how long it will take us to process all unprocessed data. This depends primarily on how much compute power is available and is quite flexible. 
  • Backward looking lag indicates how “stale”, out of date, or “fresh” our downstream data is.

Here we are concerned with backward looking lag, and therefore define lag as ‘the age of the oldest unprocessed record.’ The older the oldest unprocessed record, the larger the lag. 

In batch processing pipelines, a certain amount of lag is acceptable since records accumulate before a batch is processed and we also want to allow the system to retry batches when temporary errors occur. The degree of acceptable lag is derived from the SLO targets set. When there is no lag, all input records have been processed. When lag starts to grow over time, however, there is likely a blockage in the pipeline. The question is: Where? 

Monitoring every pipeline component individually makes lag metrics actionable

At the highest level, one may calculate ‘System Lag’. System lag encompasses the end-to-end lag of the entire system, from the moment of record creation by a producer, the user in this case, to the moment that record rests in the data lake after processing. This is readily available info as there is usually a lot of metadata on pipeline performance, including the record timestamp of when a job seeker clicked ‘Save’, and the timestamp of when data arrives to the app that’s putting it into the data lake.

System lag is great for a manager level dashboard, but not for revealing or pinpointing a problem with a specific app or component in the pipeline (and identifying the right person to deal with it).

Therefore, we need to dive deeper and establish ‘Component Lag’. As the name suggests, this is the lag related to each specific component in the pipeline individually. In effect, the age of data that has not yet entered into that component.

To collect the required metadata on pipeline performance at the granular component level we need to use a few special tools.

Monitoring Spark query progress with checkpoints and versions

To maintain a degree of fault-tolerance, Spark takes snapshots of its internal state, including metadata about data progressing along checkpoints in the pipeline. If a failure occurs during the processing of a streaming query, Spark can recover from the last successful checkpoint, and continue processing from that point, i.e. the last successfully processed batch or record. This ensures that no data is lost and no duplicate processing occurs.

To identify batches of data as it crosses checkpoints in a Spark job using Delta Lake, each batch’s data gets put into a different version of a Delta Table.

Checkpoints occur at the “entry to” and “exit from” processing components. Each checkpoint will log what versions have crossed it, and this is how we can understand what data has successfully completed each stage in the pipeline. 

Consider Figure 1. Four sets of data have been created, each receiving a version number  1..2..3..4. Version 1 has passed component 1 and checkpoint C has logged that state. Thus if the processing were to fail now, we can resume knowing that version 1 does not need to go through component 1 again. Versions 2, 3 and 4, however, are still waiting to be processed by component 1.

Figure 1. System lag vs. component lag in a Spark data processing pipeline, noting how checkpoints, versions and timestamps are related.

The Spark Streaming Query Listener collects checkpoint information on processed versions

The Spark StreamingQueryListener (SSQL) is the tool used to monitor and report on events in the processing pipeline. By monitoring at specific checkpoints in the pipeline, we can identify versions of data incoming and outgoing from specific components.

Specifically for Delta Lake, the SSQL makes available sources.endOffset.reservoirVersion which we turn into a spark_delta_last_read_end_version metric. Plotting that, along with the “last produced delta_version” (which we’ll show how to obtain later on), for a specific component in a Delta Lake batch processing job that runs every hour, the graph would look like this: 

Figure 2. Version counters on producer and consumer of a specific component. 

The “last produced delta_version” counter (green) is continuously incrementing as new data is arriving from the producer (i.e., user or previous component). Once every hour, the consumer component processes all data (versions) accumulated in its backlog. The next version arriving to the consumer will be the new “last produced delta_version” and the beginning of lag. 

In the example from Figure 1, we have a 3 version lag. What if it were a 50 version lag or 500? Should we alert someone? 

Here’s an example where something unusual happened. Don’t be put off by the “last consumed delta_version” being higher than the “last produced delta_version”—it’s perfectly normal to have Delta Lake transactions whose housekeeping is done outside the data producer.

Just before 19:00, a new version has been produced—but the consumer doesn’t react until after 22:30. In this situation, the consumer is only behind by 1 version—but the version is huge, and the actual clock time of our processor’s lag is over 3 hours.

Setting up monitoring and alerts based on accumulated unprocessed versions is misleading because a 50 version delay might mean lots of other totally valid actions are being taken on the table, such as vacuuming. What’s more, 50 or 500 versions might be processed very quickly, on the order of seconds as processing rates vary greatly.

Monitoring lag in terms of versions will create an unrealistic snapshot, resulting in overly sensitive alerts and unnecessary pages for on-call developers. And then you’re on a slippery slope. One may dial down the alerts too much so that when something really bad does trigger an alert, the problem is already way out of hand and cannot be solved fast enough.

And so, converting this information into time units reflecting the age of the last unprocessed version is the way ahead. 

Converting version lag into time with Prometheus

To convert version lag into actionable time units we use Prometheus, a time series database.

Identifying the oldest processed version

First, we must identify the oldest processed version. We use the Spark Streaming Query Listener-derived metric spark_delta_last_read_end_version. This metric is increased at the end of a component’s successful run. 

max(max_over_time(spark_delta_last_read_end_version{job="example_processor"}[1d]))

spark_delta_last_read_end_version comes from our DeltaStreamListenerMetrics class, which converts the Delta Lake structures that come through the Spark StreamingQueryListener into the last-processed table version number when the consumer has finished processing a batch. We use max_over_time() because the metric is 0 until the processing is done, and max() to filter out series from previous executions of the job within the last day. The source code is included in the next section.

Discovering WHEN the oldest processed version was produced

Whenever a version exists that is higher than spark_delta_last_read_end_version, there may be unprocessed data. Note: Some versions don’t have data changes and could be excluded, but we can be conservative and ignore that for now. For the last unprocessed version, we must extract when it was made available to our consumer component. In effect, the version creation time by the producer.

Prometheus is able to extract information regarding when a version was created, but can’t provide the version ID. 

The challenge, then, is to figure out the time the oldest record was produced after the “last processed version”. We can do this by using ‘>’ to select all delta versions that are newer than the last processed version, and then take the minimum timestamp of those delta versions.

min_over_time(
  (
      timestamp(delta_version{table="example_input_dataset"})
    and
      (
          delta_version{table="example_input_dataset"}
        >
          scalar(
            max(max_over_time(spark_delta_last_read_end_version{job="example_processor"} offset -1d[1d:]))
          )
      )
  )[1d:]
)

Working from the inside out, for the current PromQL evaluation period, we get the current version of the Delta table and compare it with the latest-processed version (spark_delta_last_read_end_version) over the whole subsequent day (looking forward from the current evaluation period: offset -1d[1d:]). 

The  “>”  filters out any values where the processor is all caught up, essentially producing an expression that answers “at each evaluation period, was there a higher current delta_version that needed processing?”. Using the filter condition on the right of timestamp(…) and gets timestamps of those evaluation periods. Then the outermost min_over_time finds the earliest time that a yet-unprocessed version was available.

Crucially, you must make absolutely sure to use the correct table and job labels. Otherwise you run the risk of getting metrics that seem plausible but are actually measuring something else, or even total nonsense, if, for example, the “example_processor” job does not actually read the “example_input_dataset” table. 

Calculating the lag

Lag is simply the difference between the current time and the value computed above.

min(
    time()
  -
    min_over_time(
      (
          timestamp(delta_version{table="example_input_dataset"})
        and
          (
              (delta_version{table="example_input_dataset"})
            >
              scalar(
                max(max_over_time(spark_delta_last_read_end_version{job="example_processor"} offset -1d[1d:]))
              )
          )
      )[1d:]
    )
)

Creating a lag time series

To create a time series dataset that reliably captures the lag for historical versions, we set up a Prometheus recording rule. The recording rule is used to produce a new time series from a complicated expression value at each evaluation period. This way, historical values are consistently available. Otherwise, Prometheus has a limited ability to look forward from previous evaluation periods, which means in practice, without the recording rule we are unable to see the lag for any other versions before the last-processed one.

groups:                                                                                                                         
- name: rules                                                                                                                   
  rules:                                                                                                                        
  - record: Example_delta_lag_time
    expr: |                                                                                                                     
      min(
          time()
        -
          min_over_time(
            (
                timestamp(delta_version{table="example_input_dataset"})
              and
                (
                    (delta_version{table="example_input_dataset"})
                  >
                    scalar(
                      max(
                        max_over_time(
      spark_delta_last_read_end_version{job="example_processor"} offset -1d[1d:]
                        )
                      )
                    )
                )
            )[1d:]
          )
      )

Setting up an alert

Now we can use the recording in an alert expression.

  - alert: Example_Lag
    expr: Example_delta_lag_time > 10800
    for: 1h

A visualization of age of oldest unprocessed input to a component will look like this: 

The age of the oldest unprocessed record increases over time. When the batch process runs, all versions are consumed and the next batch starts accumulating again. 

Source code

Example producer:

import com.ziprecruiter.log.scala.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}

import java.util.UUID
import scala.util.Random

object ExampleProducer {

  val logger: Logger = Logger[this.type]
  val spark = SparkSession
    .builder()
    .master("local")
    .config(
      new SparkConf()
        .setAppName("ExampleProducer")
        .set(
          "spark.sql.catalog.spark_catalog",
          "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        )
    )
    .getOrCreate()

  def randomUUID(): String = UUID.randomUUID().toString

  private def generateBatch(): Dataset[InputRecord] = {
    import spark.implicits._
    Seq.fill(Random.nextInt(10))(InputRecord(randomUUID())).toDS()
  }

  def main(args: Array[String]): Unit = {
    val newData = generateBatch()
    newData.write.format("delta").mode("append").save("/tmp/dataset")
    val producedVersion =
      spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
    logger.info(
      "would push this metric",
      """delta_version{table="example_dataset"}""" -> producedVersion
    )
  }
}

case class InputRecord(id: String)

Example consumer:

import com.ziprecruiter.common.spark.metrics.SparkListenerMetrics
import com.ziprecruiter.log.scala.Logger
import com.ziprecruiter.metrics.prometheus.{
  PrometheusMetricConfig,
  PrometheusMetricPushGatewayConfig
}
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Dataset, SparkSession}
import com.ziprecruiter.metrics.MetricRegistry

object ExampleConsumer {

  val logger: Logger = Logger[this.type]
  val spark = SparkSession
    .builder()
    .master("local")
    .config(
      new SparkConf()
        .setAppName("ExampleConsumer")
        .set(
          "spark.sql.catalog.spark_catalog",
          "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        )
    )
    .getOrCreate()

  private def processInput(): Dataset[OutputRecord] = {
    import spark.implicits._
    spark.readStream
      .format("delta")
      .load("/tmp/dataset")
      .as[InputRecord]
      .map { x =>
        OutputRecord(id = x.id, status = "IS AWESOME")
      }
  }

  def main(args: Array[String]): Unit = {
    MetricRegistry.getRegistry(
      new PrometheusMetricConfig()
        .setIncludeJvmMetrics(true)
        .setPushGateway(new PrometheusMetricPushGatewayConfig())
    )
    spark.sparkContext.addSparkListener(new SparkListenerMetrics)
    val deltaMetrics = new DeltaStreamListenerMetrics()
    spark.streams.addListener(deltaMetrics)

    processInput().writeStream
      .option("checkpointLocation", "/tmp/example-consumer-checkpoint")
      .foreachBatch { (batch: Dataset[OutputRecord], batchID: Long) =>
        logger.info("writing batch", "record_count" -> batch.count())
        batch.write.format("delta").mode("append").save("/tmp/consumer")
      }
      .trigger(Trigger.Once)
      .start()
      .awaitTermination()

    logger.info(
      "would push this metric",
      """delta_last_read_end_version""" -> deltaMetrics.deltaLastReadEndVersion
        .get()
    )
  }
}

case class OutputRecord(id: String, status: String)

DeltaStreamListenerMetrics (For the consumer):

import com.ziprecruiter.common.spark.metrics._
import com.ziprecruiter.metrics.MetricRegistry
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.json4s._
import org.json4s.jackson.JsonMethods._
import com.ziprecruiter.log.scala.Logger
import org.apache.spark.sql.delta.sources.DeltaSourceOffset

class DeltaStreamListenerMetrics extends StreamingQueryListener {

  private val logger: Logger = Logger[this.type]
  private val registry = MetricRegistry.getRegistry()

  val deltaLastReadStartVersion = registry.createGauge(
    registry
      .createNameBuilder()
      .setName("spark_delta_last_read_start_version")
      .setHelp(
        "Last Delta Version that was read from a source during a stream process as the start offset"
      )
      .build()
  )

  val deltaLastReadEndVersion = registry.createGauge(
    registry
      .createNameBuilder()
      .setName("spark_delta_last_read_end_version")
      .setHelp(
        "Last Delta Version that was read from a source during a stream process as an end offset"
      )
      .build()
  )

  override def onQueryProgress(
      event: StreamingQueryListener.QueryProgressEvent
  ): Unit = {
    if (event.progress.sources.size > 0) {
      val startOffsetJson: String = event.progress.sources(0).startOffset
      val endOffsetJson: String = event.progress.sources(0).endOffset

      val startVersion = offsetInfoToDeltaVersion(startOffsetJson)
      val endVersion = offsetInfoToDeltaVersion(endOffsetJson)

      startVersion match {
        case Some(i) => {
          deltaLastReadStartVersion.set(i)
          logger.info(
            "onQueryProgress",
            "readStartDeltaVersion" -> startVersion
          )
        }
        case None =>
          logger.error("onQueryProgress failed to parse delta versions")
      }

      endVersion match {
        case Some(i) => {
          deltaLastReadEndVersion.set(i)
          logger.info("readEndDeltaVersion" -> startVersion)
        }
        case None =>
          logger.error("onQueryProgress ailed to parse delta versions")
      }
    }
  }

  def offsetInfoToDeltaVersion(offsetInfo: String): Option[Long] = {

    implicit val formats = DefaultFormats
    if (offsetInfo == null || offsetInfo == "") {
      logger.warn(
        "offsetInfoToDeltaVersion: offsetInfo is empty. Cannot retrieve read version"
      )
      return None
    }
    try {
      val offsetJson: JValue = parse(offsetInfo)
      val startOffset = offsetJson.extractOrElse[DeltaSourceOffset](null)
      if (startOffset == null) {
        logger.warn("offsetInfoToDeltaVersion could not extract version info")
        None
      } else {
        Some(startOffset.reservoirVersion)
      }
    } catch {
      case e: Exception =>
        logger.error("offsetInfoToDeltaVersion failed", "error" -> e)
        e.printStackTrace()
        None
    }
  }

  override def onQueryStarted(
      event: StreamingQueryListener.QueryStartedEvent
  ): Unit = {
    logger.info(
      "onQueryStarted",
      "event" -> s"${event.id}, ${event.runId}, ${event.name}, ${event.timestamp}"
    )
  }

  override def onQueryTerminated(
      event: StreamingQueryListener.QueryTerminatedEvent
  ): Unit = {
    logger.info(
      "onQueryTerminated",
      "event" -> s"${event.id}, ${event.runId}, ${event.exception.getOrElse("")}"
    )
  }
}

Setting up alerts with realistic lag goals

When rolling out any new system, it is prudent to set highly sensitive alert thresholds and slowly reduce them as the system stabilizes.

Of course, lag should be as low as possible, but we also want to avoid creating unnecessary pressure to achieve overly zealous latencies. The system must be allowed to fix itself (retry on failure), tolerate some outages, and not “wake people up” unless it’s truly necessary. 

In the end, we want to ensure that the lag (age of the oldest unprocessed version) for each component (and all components cumulatively) is well under the SLO time for that service.

In order to achieve the target SLO, we can make a job run more frequently. This will change the version rate, but as we are monitoring in time, our monitoring is not skewed.

As a closing note, remember – when a component in the pipeline has a conspicuously consistent zero lag, be sure to go back upstream and make sure there is not a blockage in previous components.

If you’re interested in working on solutions like these, visit our Careers page to see open roles.

* * *

About the Author:

Jeff Rhyason is a Senior Software Engineer at ZipRecruiter on the Jobseeker Experience team where he modernizes our datastores and APIs so teams can work more independently. Jeff’s team is fully remote, distributed across Canada and the US. They are a tight knit group and he loves wrangling curly braces with such a talented and diverse bunch. On off hours you’ll find Jeff cycling around Vancouver, Canada, fueled on a steady diet of kombucha.

More Articles by Engineering Team at ZipRecruiter