Apache Beam & Open Telemetry

TBD Blog
5 min readOct 3, 2021

After you’ve finalized an Apache Beam pipeline or made substantial changes to the pipeline code, it’s often good practice to see how well it’s performing. If you use Google Cloud Dataflow like me, you’ll often only notice inefficiencies in your bill. The Dataflow autoscaling works really well for parallelizable tasks so you can often get away with being inefficient. You can attach a profile to your job (see this great post for a how-to) but I was curious if we could gain even more fine grained insight into how my pipeline was performing.

Distributed Tracing

At a previous job we set up distributed tracing for our microservices to help us diagnose issues with our 99.9p latencies since our customers cared a lot about the performance of our APIs. We integrated with Lightstep and were able to find really interesting issues with our APIs that rarely occurred and were impacting our long tail latency. For example, we had a user for checking uptime whose very unique and unrealistic workload resulted in inefficient queries against our database.

I was wondering if I could find something similar for my Beam projects. Is there one customer’s data causing issues or is every customer equally efficient?

I attempted to repeat that past success by integrating a basic Beam pipeline with Open Telemetry, which allows you to integrate once and send the data to multiple backends.

Basic Setup

Carrying through the span context

Headers are used to carry the span context when working with distributed tracing and APIs but in Beam, there are no headers being passed around with our data. To get around this, I setup a basic wrapper around the data that would act like these headers and carry around the necessary context.

@EqualsAndHashCode
@ToString
@Getter
public class TraceableElement<T> implements Serializable {
private final T element;
private final Map<String, String> context;
private final List<Map<String, String>> linkedContext;
}

Since this is not the standard way to pass around the context or linked context, you need a class to tell the framework how to get and set these context values.

public class TraceableElementPropagation<T> implements TextMapSetter<TraceableElement<T>>, TextMapGetter<TraceableElement<T>> {

@Override
public Iterable<String> keys(TraceableElement<T> tTraceableElement) {
return tTraceableElement.getContext().keySet();
}

@Nullable
@Override
public String get(@Nullable TraceableElement<T> tTraceableElement, String s) {
return tTraceableElement.getContext().get(s);
}

@Override
public void set(@Nullable TraceableElement<T> tTraceableElement, String s, String s1) {
tTraceableElement.getContext().put(s, s1);
}
}

Now you need the ability to create an OpenTelemetrySdk instance in the @Setup portion of your Beam DoFn to avoid creating an instance on every call to your @ProcessElement method.

To do this you first need a SpanExporter.

For a locally running Jaeger instance you can follow their documentation and use something like:

// Create a channel towards Jaeger end point
ManagedChannel jaegerChannel =
ManagedChannelBuilder.forAddress(jaegerHost,
openTelemetryOptions.getJaegerPort())
.usePlaintext().build();
// Export traces to Jaeger
SpanExporter spanExporter =
JaegerGrpcSpanExporter.builder()
.setChannel(jaegerChannel)
.setTimeout(30, TimeUnit.SECONDS)
.build();

Or if you’re running in GCP you can use:

// Export traces to GCP Trace
SpanExporter spanExporter = TraceExporter.createWithConfiguration(
TraceConfiguration.builder()
.setProjectId(gcpProjectId)
.build());

Once you have a SpanExporter you can create an OpenTelemtrySdk instance.

Resource serviceNameResource =
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME, serviceName));

SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor.builder(spanExporter).build())
.setResource(
Resource.getDefault().merge(serviceNameResource))
.setSampler(Sampler.alwaysOn()) // can also set a rate
.build();
OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(
ContextPropagators.create(
W3CTraceContextPropagator.getInstance())).build();

Creating Spans

Inside of the @ProcessElement portion of our DoFn we will need to start a span and manually set the parent span’s context to link the spans together.

// using an openTelemetry method created in the @Setup phase
final Context parentContext = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), traceableElement, new TraceableElementPropagation<>());
// start building the span
final Tracer tracer = openTelemetry.getTracer(instrumentationName);
final SpanBuilder spanBuilder = tracer.spanBuilder(getClass().getName());
// set the parent if it exists to show the path through your Beam DAG
if (parentContext != null) {
spanBuilder.setParent(parentContext);
}
// start the span
final Span span = spanBuilder.startSpan();
// set the scope so you can create nested spans if you want
try (Scope scope = span.makeCurrent()) {
// perform some action on the input
final OutputT output = process(input);
// wrap the output in the TraceableElement class defined before to carry along the span context
final TraceableElement<OutputT> traceableOutput = new TraceableElement<>(output);
// inject the context
openTelementry.getPropagators().getTextMapPropagator()
.inject(Context.current(), traceableOutput,
new TraceableElementPropagation<>());
// output the result
receiver.output(traceableOutput);
} finally {
span.end();
}

Adding additional metadata with attributes

OpenTelemetry allows you to attach additional context to the spans. If you wish to add some to your spans simply take your spanBuilder and call:

spanBuilder.setAttribute(key, value);

Adding linked context:

The model for distributed tracing does not work well with joining data. For an API, you receive a single request and it may fan out into multiple calls to other services but it does not every map multiple requests back to a single element. To get around this issue, I represent a join by adding linked context. You can do this with:

traceableElement.getLinkedContext().stream().forEach(linked -> {
final Context context = openTelemetry).getPropagators().getTextMapPropagator().extract(Context.current(), linked, new MapPropagation<>());
spanBuilder.addLink(Span.fromContext(context).getSpanContext());
});

Results

Jaeger

I first ran with the local Jaeger setup to prove out that everything was working as expected. I used a very basic word count style example and was able to see the basic trace information:

Along with the a dependency graph view that mirrors the Beam DAG:

GCP Trace

After validating that things were working as expected, I switched over the SpanExporter to the GCP option outlined above and ran inside of Dataflow. The results were not particularly insightful with this basic example but it does provide a view into the order in which your steps are executed and the amount of time the job is waiting between them.

Conclusions

It was fun trying out this new approach to debugging a pipeline’s performance but it was incredibly verbose. I even tried writing some custom wrappers around DoFns and CombineFns to help with this but it would be a lot of work to add this to an existing pipeline. On top of that, the inability to properly model joins within the pipeline means you’ll likely be missing out on some key insights since joins and the shuffling of data tend to be the most expensive part of the pipelines I have worked on.

Even though this would be painful to add to an entire pipeline, adding tracing to a single DoFn would be easier. This could be helpful for debugging the internal performance of a more complicated transform to see where you’re spending time. Adding attributes to the spans could also help narrow down the issue to a particular subset of the data, like a single large customer.

Adding the code for creating traces within Beam was verbose but the code for switching between tracing backends was very simple. The ability to run with different backends was also great for local debugging and then running against a managed service like GCPs Trace product. I am looking forward to trying this out on some APIs better suited for the tool.

If anyone else has done something similar, has a better way to profile Beam or other batch jobs, or would like to discuss potential solutions for the problem, feel free to reach out at timbrowndatablog@gmail.com.

--

--

TBD Blog

Tim Brown’s Data Blog: a place for the random stuff I’m trying out in the world of all things data