Apache Beam provides an SDK for your batch and streaming data pipelines. If you’re using Beam with Java, you’re likely familiar with Coders and the default SerializableCoder that relies on Java Serialization. In this post, I take a look at the performance of various coders in very basic pipelines to help you choose the coder that’s right for your use case.
TL;DR The SerializableCoder is fine for your initial prototyping but hinders pipeline performance. On top of that Beam Schemas may make initial prototyping easier while providing a more performant coder.
Test Datasets
All of the test datasets were either instances of TestObj or BasicTestObj:
@Data
@AllArgsConstructor
@NoArgsConstructor
public final class TestObj implements Serializable {
private static final Random RANDOM = new Random();
private int testInt;
private Integer testWrappedInt;
private long testLong;
private Long testWrappedLong;
private double testDouble;
private Double testWrappedDouble;
private String testString;
private List<String> testSimpleList;
private List<NestedObj> testComplexList;
private Map<String, Integer> testSimpleMap;
private Map<String, NestedObj> testComplexMap;
@Data
@NoArgsConstructor
@AllArgsConstructor
public static final class NestedObj implements Serializable {
private int nestedInt;
private String nestedString;
}
}@Data
@AllArgsConstructor
@NoArgsConstructor
public final class BasicTestObj implements Serializable {
private static final Random RANDOM = new Random();
private int testInt;
@Nullable private Integer testWrappedInt;
private long testLong;
@Nullable private Long testWrappedLong;
private double testDouble;
@Nullable private Double testWrappedDouble;
@Nullable private String testString;
}
Three sets of json data were created and saved into GCS buckets:
- Large Objects: Uses TestObj defined above. This dataset has larger objects than the next but less of them. This was made by making lists and maps have up to 100 elements and random strings of up to 100 characters for all string fields.
Total Elements: 3,665,500 Total Size (as json strings): 101.34 GB - Small Objects: Uses TestObj defined above. This dataset has smaller objects but more of them. Lists and maps are limited to up to 10 elements and strings where capped at 10 characters.
Total Elements: 10,000,000 Total Size (as json strings): 16.34 GB - Basic Objects: Uses BasicTestObj defined above. This dataset does not have any lists, maps, or nested complex objects. This dataset was meant to illustrate a much more straightfoward and “basic” set of data. The integers and longs are all positive and between 0 and 100. The strings are up to 10 characters.
Total Elements: 50,000,000 Total Size (as json strings): 9.5 GB
Coders Evaluated
- SerializableCoder: relies on java serialization, default in beam for java pipelines
- HandwrittenCoder*: a coder that relies on existing Coders provided by beam (NullableCoder, StringUtf8Coder, VarIntCoder, etc). Uses the AllArgsConstructor on deserialization.
- JsonCoder*: uses Jackson to write/read data as json
- JsonDelegateCoder*: uses the delegate coder concept with a StringUtf8Coder and Jackson to write/read data as json string
- ProtoCoder: a proto schema was defined to match the POJO and then the data was converted into the proto schema’s generated java objects in the pipeline before shuffling.
- AvroCoder: same as above but with an avro schema instead of a proto schema
- KryoCoder: a coder available through the Kryo extension that provides an alternative to java serialization (see Kryo’s GitHub page for more info)
- FstCoder*: a coder implemented with fast-serialization, another alternative to java’s serialization
- SchemaCoder: See the documentation and this Beam Summit presentation to learn more about Beam Schemas.
@DefaultSchema, @SchemaCreate, and @Nullable
annotations were added to the classes above to make this work (also required getters for all fields being serialized, JavaFieldSchema required public fields)
*Code samples below
With the exception of the ProtoCoder, AvroCoder, and SchemaCoder, the rest of the coders were able to function as a drop in replacement for the SerializableCoder making it very painless to update an existing code.
Baseline Pipeline
To start off we’ll have a basic pipeline (run in Google Cloud’s Dataflow with n2-standard-2 machines), with no data being shuffled between machines. In all cases, I measured the total vCPU hours and the size of the dataset after the To Java Objects
step. For Google Cloud’s Dataflow the vCPU hours, disk usage, and the amount of data shuffled are what drive most of your pipeline’s cost. Code snippet for the pipeline:
PCollection<KV<String, TestObj>> tmp = p.apply("ReadLines", TextIO.read().from(options.getInput()))
.apply("To Java Objects", ParDo.of(new ReadFromJson()))
.setCoder(coder)
.apply("Add Random Keys", WithKeys.of(input -> Integer.toString(input.hashCode())))
.setCoder(KvCoder.of(StringUtf8Coder.of(), coder));
if (options.getShuffle()) {
tmp = tmp.apply("Reshuffle", Reshuffle.of());
}
tmp.apply("Drop keys", Values.create());
Large Object Dataset Results
The main difference between these pipeline runs was the size of the coded dataset. There was slight variation in vCPU time but mainly for the Avro coder which required an extra conversion step to the avro object so it’s expected it may take a little more effort.
Small Object Dataset Results
On the small object dataset, we see similar results but the size differences in the coded datasets is more significant.
Basic Object Dataset Results
For the basic object dataset there is a large difference between the SerializableCoder’s dataset size vs everything else. Another interesting finding, the SchemaCoder’s output size is pretty much equal to the Handwritten coder. If you dig into the implementation there you’ll see that it’s basically generating the same thing for you behind the scenes.
Shuffle Pipeline
Using the same code shown above but now with the the shuffle option enabled, we force data to move between workers and therefore actually see the impact of a coder’s performance. I used the Wall Time metric from Dataflow as an analog for the performance of the coders in the reshuffle step.
Large Object Dataset Results
The FstCoder is the winner here for reshuffle performance. The json coders were the clear losers.
Small Object Dataset Results
The FstCoder is no longer a clear winner with this dataset. More importantly the SchemaCoder starts to break away from the SerializableCoder in terms of shuffle performance. This is likely due to the difference in coded dataset size between the two coders compared to their sizes with the large object dataset.
Basic Object Dataset Results
This dataset really highlighted how poorly the SerializableCoder does on basic and relatively small objects. The Json coders were the second worse option with everything else having fairly similar performance.
Conclusions
Remember that these results are from a single run of a very basic pipeline and it’s worth analyzing the performance of a couple of these profilers in your own pipeline with your own unique dataset.
Proto, Avro, and Handwritten Coders
All three of these are improvements over the baseline but require more developer overhead. For the proto and avro cases, you would need to define schemas for your intermediate objects within your pipeline which is a lot of overhead. Most of the time avro and proto schemas are only defined for input and output classes. If you already have these classes defined, you shouldn’t be using the SerializableCoder to begin with.
Similarly, writing a coder for every object in your pipeline is going to require a higher upfront cost along with a maintenance cost as fields are updated.
Best Drop in Replacement For SerializableCoder
The FstCoder and KryoCoder performed similarly across all three datasets and showed a significant improvement over the SerializableCoder. If you have a shuffle heavy pipeline, you should consider trying both of these out. I’ve included sample code for the FstCoder I wrote below but if you’re looking for a maintained Beam extension then try out the KryoCoder.
Best All Around Option (In My Opinion)
The SchemaCoder was never the fastest but still performed really well on the smaller objects and had one of the smallest coded dataset sizes for the smaller objects.
Taking a step back from just the encoding and decoding of objects, there are a handful of benefits that you’ll also get if you adopt SchemaCoder in your pipelines. For example, Gleb Kanerov’s presentation on schemas in Beam shows how you can greatly reduce the boilerplate of your pipeline code (it also covers coders and performance). There are ways to go from proto or avro object to a Beam Schema providing lots of flexibility already out of the box. On top of that, you can always implement your own SchemaProvider if the existing options aren’t working for your needs.
The SchemaCoder will also be deterministic if the coders for the fields are also deterministic. This allows you to use the coder for Distinct transform and for a key in a KV that you feed into your GroupByKey transform.
As an added benefit, you can also declare your variables as final within your java class and still use the SchemaCoder. This is a big win in my book over Java Serialization or one of the drop in replacements above where your fields cannot be final and you need a no-args constructor.
Between the ability to make fields immutable in your objects, the ease of developing new pipelines with Schemas and Beam SQL, and the improvement over the baseline SerializableCoder, I would recommend trying out the SchemaCoder and the features that come with it. Since it’s an active part of the Beam project, it’s likely that even more features and possibly performance improvements will come out in the future.
Source Code Samples for Coders
JsonCoder
@AllArgsConstructor(staticName = "of")
private static class JsonCoder<T> extends CustomCoder<T> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
private final Class<T> clazz;
@Override
public void encode(T value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException {
OBJECT_MAPPER.writeValue(outStream, value);
}
@Override
public T decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException {
return OBJECT_MAPPER.readValue(inStream, clazz);
}
}
JsonDelegateCoder
private static class JsonDelegateCoder {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <T> CustomCoder<T> of(Class<T> clazz) {
return DelegateCoder.of(StringUtf8Coder.of(), new DelegateCoder.CodingFunction<T, String>() {
@Override
public String apply(T input) throws @UnknownKeyFor @NonNull @Initialized Exception {
return OBJECT_MAPPER.writeValueAsString(input);
}
}, new DelegateCoder.CodingFunction<String, T>() {
@Override
public T apply(String input) throws @UnknownKeyFor@NonNull@Initialized Exception {
return OBJECT_MAPPER.readValue(input, clazz);
}
});
}
}
FstCoder
public static class FstCoder<T> extends CustomCoder<T> {
private static final FSTConfiguration CONF = FSTConfiguration.createDefaultConfiguration();
private final Class<T> clazz;
private FstCoder(Class<T> clazz) {
this.clazz = clazz;
CONF.registerClass(clazz);
}
public static <T> FstCoder<T> of(Class<T> clazz) {
return new FstCoder<T>(clazz);
}
@Override
public void encode(T value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException {
final FSTObjectOutput out = CONF.getObjectOutput(outStream);
out.writeObject(value, clazz);
out.flush();
}
@Override
public T decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException {
try {
final FSTObjectInput in = CONF.getObjectInput(inStream);
return (T) in.readObject(clazz);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
HandwrittenCoder
public static class HandwrittenCoder extends CustomCoder<TestObj> {
@Override
public void encode(TestObj value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException {
VarIntCoder.of().encode(value.getTestInt(), outStream);
NullableCoder.of(VarIntCoder.of()).encode(value.getTestWrappedInt(), outStream);
VarLongCoder.of().encode(value.getTestLong(), outStream);
NullableCoder.of(VarLongCoder.of()).encode(value.getTestWrappedLong(), outStream);
DoubleCoder.of().encode(value.getTestDouble(), outStream);
NullableCoder.of(DoubleCoder.of()).encode(value.getTestWrappedDouble(), outStream);
StringUtf8Coder.of().encode(value.getTestString(), outStream);
ListCoder.of(StringUtf8Coder.of()).encode(value.getTestSimpleList(), outStream);
ListCoder.of(NestedCoder.of()).encode(value.getTestComplexList(), outStream);
MapCoder.of(StringUtf8Coder.of(), VarIntCoder.of()).encode(value.getTestSimpleMap(), outStream);
MapCoder.of(StringUtf8Coder.of(), NestedCoder.of()).encode(value.getTestComplexMap(), outStream);
}
@Override
public TestObj decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException {
return new TestObj(
VarIntCoder.of().decode(inStream),
NullableCoder.of(VarIntCoder.of()).decode(inStream),
VarLongCoder.of().decode(inStream),
NullableCoder.of(VarLongCoder.of()).decode(inStream),
DoubleCoder.of().decode(inStream),
NullableCoder.of(DoubleCoder.of()).decode(inStream),
StringUtf8Coder.of().decode(inStream),
ListCoder.of(StringUtf8Coder.of()).decode(inStream),
ListCoder.of(NestedCoder.of()).decode(inStream),
MapCoder.of(StringUtf8Coder.of(), VarIntCoder.of()).decode(inStream),
MapCoder.of(StringUtf8Coder.of(), NestedCoder.of()).decode(inStream));
}
}