[ https://issues.apache.org/jira/browse/BEAM-5408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chamikara Jayalath resolved BEAM-5408. -------------------------------------- Resolution: Fixed Fix Version/s: 2.8.0 > (Java) Using Compression.GZIP with TFRecordIO > --------------------------------------------- > > Key: BEAM-5408 > URL: https://issues.apache.org/jira/browse/BEAM-5408 > Project: Beam > Issue Type: Bug > Components: io-java-gcp > Affects Versions: 2.4.0 > Reporter: haden lee > Assignee: Chamikara Jayalath > Priority: Major > Fix For: 2.8.0 > > > In short, `TFRecrdIO.read()` does not seem to work if the entry being read is > longer than 8,192 (in terms of byte[] length). `TFRecordIO.write()` seems to > be OK with this though (based on some experiments). Perhaps there is some > hard-coded value for this specific length somewhere in the SDK, and I'm > wondering if it can be increased or parameterized. > [I've posted this on > StackOverflow|https://stackoverflow.com/questions/52284639/beam-java-sdk-with-tfrecord-and-compression-gzip], > but I was advised to report it here. > Here are the details: > We're using Beam Java SDK (and Google Cloud Dataflow to run batch jobs) a > lot, and we noticed something weird (possibly a bug?) when we tried to use > `TFRecordIO` with `Compression.GZIP`. We were able to come up with some > sample code that can reproduce the errors we face. > To be clear, we are using Beam Java SDK 2.4. > Suppose we have `PCollection<byte[]>` which can be a PC of proto messages, > for instance, in byte[] format. > We usually write this to GCS (Google Cloud Storage) using Base64 encoding > (newline delimited Strings) or using TFRecordIO (without compression). We > have had no issue reading the data from GCS in this manner for a very long > time (2.5+ years for the former and ~1.5 years for the latter). > Recently, we tried `TFRecordIO` with `Compression.GZIP` option, and > *sometimes* we get an exception as the data is seen as invalid (while being > read). The data itself (the gzip files) is not corrupted, and we've tested > various things, and reached the following conclusion. > When a `byte[]` that is being compressed under `TFRecordIO` is above certain > threshold (I'd say when at or above 8192), then > `TFRecordIO.read().withCompression(Compression.GZIP)` would not work. > Specifically, it will throw the following exception: > > {code:java} > // code placeholder > Exception in thread "main" java.lang.IllegalStateException: Invalid data > at > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444) > at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:642) > at > org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:526) > at > org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:426) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:473) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:468) > at > org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:261) > at > org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:141) > at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) > at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > This can be reproduced easily, so you can refer to the code at the end. You > will also see comments about the byte array length (as I tested with various > sizes, I concluded that 8192 is the magic number). > So I'm wondering if this is a bug or known issue – I couldn't find anything > close to this on Apache Beam's Issue Tracker [here][1] but if there is > another forum/site I need to check, please let me know! > If this is indeed a bug, what would be the right channel to report this? > — > The following code can reproduce the error we have. > A successful run (with parameters 1, 39, 100) would show the following > message at the end: > {code:java} > // code placeholder > ------------ counter metrics from CountDoFn > [counter] plain_base64_proto_array_len: 8126 > [counter] plain_base64_proto_in: 1 > [counter] plain_base64_proto_val_cnt: 39 > [counter] tfrecord_gz_proto_array_len: 8126 > [counter] tfrecord_gz_proto_in: 1 > [counter] tfrecord_gz_proto_val_cnt: 39 > [counter] tfrecord_uncomp_proto_array_len: 8126 > [counter] tfrecord_uncomp_proto_in: 1 > [counter] tfrecord_uncomp_proto_val_cnt: 39 > {code} > > With parameters (1, 40, 100) which would push the byte array length over > 8192, it will throw the said exception. > You can tweak the parameters (inside `CreateRandomProtoData` DoFn) to see why > the length of `byte[]` being gzipped matters. > It may help you also to use the following protoc-gen Java class (for > `TestProto` used in the main code above. Here it is: [gist link][2] > References: > [1]: [https://issues.apache.org/jira/projects/BEAM/issues/] > [2]: [https://gist.github.com/hadenlee/ae127715837bd56f3bc6ba4fe2ccb176] > Main Code: > (Note that the sample code is writing to and reading from GCS – google cloud > storage – but it has nothing do to with the storage as far as I tested.) > {code:java} > // code placeholder > package exp.moloco.dataflow2.compression; // NOTE: Change appropriately. > import java.util.Arrays; > import java.util.List; > import java.util.Map; > import java.util.Map.Entry; > import java.util.Random; > import java.util.TreeMap; > import org.apache.beam.runners.direct.DirectRunner; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.PipelineResult; > import org.apache.beam.sdk.io.Compression; > import org.apache.beam.sdk.io.TFRecordIO; > import org.apache.beam.sdk.io.TextIO; > import org.apache.beam.sdk.metrics.Counter; > import org.apache.beam.sdk.metrics.MetricResult; > import org.apache.beam.sdk.metrics.Metrics; > import org.apache.beam.sdk.metrics.MetricsFilter; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.Create; > import org.apache.beam.sdk.transforms.DoFn; > import org.apache.beam.sdk.transforms.ParDo; > import org.apache.beam.sdk.values.PCollection; > import org.apache.commons.codec.binary.Base64; > import org.joda.time.DateTime; > import org.joda.time.DateTimeZone; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import com.google.protobuf.InvalidProtocolBufferException; > import com.moloco.dataflow.test.StackOverflow.TestProto; > import com.moloco.dataflow2.Main; > // @formatter:off > // This code uses TestProto (java class) that is generated by protoc. > // The message definition is as follows (in proto3, but it shouldn't matter): > // message TestProto { > // int64 count = 1; > // string name = 2; > // repeated string values = 3; > // } > // Note that this code does not depend on whether this proto is used, > // or any other byte[] is used (see CreateRandomData DoFn later which > generates the data being used in the code). > // We tested both, but are presenting this as a concrete example of how (our) > code in production can be affected. > // @formatter:on > public class CompressionTester { > private static final Logger LOG = > LoggerFactory.getLogger(CompressionTester.class); > static final List<String> lines = Arrays.asList("some dummy string that > will not used in this job."); > // Some GCS buckets where data will be written to. > // %s will be replaced by some timestamped String for easy debugging. > static final String PATH_TO_GCS_PLAIN_BASE64 = Main.SOME_BUCKET + > "/comp-test/%s/output-plain-base64"; > static final String PATH_TO_GCS_TFRECORD_UNCOMP = Main.SOME_BUCKET + > "/comp-test/%s/output-tfrecord-uncompressed"; > static final String PATH_TO_GCS_TFRECORD_GZ = Main.SOME_BUCKET + > "/comp-test/%s/output-tfrecord-gzip"; > // This DoFn reads byte[] which represents a proto message (TestProto). > // It simply counts the number of proto objects it processes > // as well as the number of Strings each proto object contains. > // When the pipeline terminates, the values of the Counters will be printed > out. > static class CountDoFn extends DoFn<byte[], TestProto> { > private final Counter protoIn; > private final Counter protoValuesCnt; > private final Counter protoByteArrayLength; > public CountDoFn(String name) { > protoIn = Metrics.counter(this.getClass(), name + "_proto_in"); > protoValuesCnt = Metrics.counter(this.getClass(), name + > "_proto_val_cnt"); > protoByteArrayLength = Metrics.counter(this.getClass(), name + > "_proto_array_len"); > } > @ProcessElement > public void processElement(ProcessContext c) throws > InvalidProtocolBufferException { > protoIn.inc(); > TestProto tp = TestProto.parseFrom(c.element()); > protoValuesCnt.inc(tp.getValuesCount()); > protoByteArrayLength.inc(c.element().length); > } > } > // This DoFn emits a number of TestProto objects as byte[]. > // Input to this DoFn is ignored (not used). > // Each TestProto object contains three fields: count (int64), name > (string), and values (repeated string). > // The three parameters in DoFn determines > // (1) the number of proto objects to be generated, > // (2) the number of (repeated) strings to be added to each proto object, > and > // (3) the length of (each) string. > // TFRecord with Compression (when reading) fails when the parameters are > 1, 40, 100, for instance. > // TFRecord with Compression (when reading) succeeds when the parameters > are 1, 39, 100, for instance. > static class CreateRandomProtoData extends DoFn<String, byte[]> { > static final int NUM_PROTOS = 1; // Total number of TestProto objects to > be emitted by this DoFn. > static final int NUM_STRINGS = 40; // Total number of strings in each > TestProto object ('repeated string'). > static final int STRING_LEN = 100; // Length of each string object. > // Returns a random string of length len. > // For debugging purposes, the string only contains upper-case English > alphabets. > static String getRandomString(Random rd, int len) { > StringBuffer sb = new StringBuffer(); > for (int i = 0; i < len; i++) { > sb.append('A' + (rd.nextInt(26))); > } > return sb.toString(); > } > // Returns a randomly generated TestProto object. > // Each string is generated randomly using getRandomString(). > static TestProto getRandomProto(Random rd) { > TestProto.Builder tpBuilder = TestProto.newBuilder(); > tpBuilder.setCount(rd.nextInt()); > tpBuilder.setName(getRandomString(rd, STRING_LEN)); > for (int i = 0; i < NUM_STRINGS; i++) { > tpBuilder.addValues(getRandomString(rd, STRING_LEN)); > } > return tpBuilder.build(); > } > // Emits TestProto objects are byte[]. > @ProcessElement > public void processElement(ProcessContext c) { > // For debugging purposes, we set the seed here. > Random rd = new Random(); > rd.setSeed(132475); > for (int n = 0; n < NUM_PROTOS; n++) { > byte[] data = getRandomProto(rd).toByteArray(); > c.output(data); > // With parameters (1, 39, 100), the array length is 8126. It works > fine. > // With parameters (1, 40, 100), the array length is 8329. It breaks > TFRecord with GZIP. > System.out.println("\n--------------------------\n"); > System.out.println("byte array length = " + data.length); > System.out.println("\n--------------------------\n"); > } > } > } > public static void execute() { > PipelineOptions options = PipelineOptionsFactory.create(); > options.setJobName("compression-tester"); > options.setRunner(DirectRunner.class); > // For debugging purposes, write files under 'gcsSubDir' so we can easily > distinguish. > final String gcsSubDir = > String.format("%s-%d", DateTime.now(DateTimeZone.UTC), > DateTime.now(DateTimeZone.UTC).getMillis()); > // Write PCollection<TestProto> in 3 different ways to GCS. > { > Pipeline pipeline = Pipeline.create(options); > // Create dummy data which is a PCollection of byte arrays (each array > representing a proto message). > PCollection<byte[]> data = > pipeline.apply(Create.of(lines)).apply(ParDo.of(new CreateRandomProtoData())); > // 1. Write as plain-text with base64 encoding. > data.apply(ParDo.of(new DoFn<byte[], String>() { > @ProcessElement > public void processElement(ProcessContext c) { > c.output(new String(Base64.encodeBase64(c.element()))); > } > })).apply(TextIO.write().to(String.format(PATH_TO_GCS_PLAIN_BASE64, > gcsSubDir)).withNumShards(1)); > // 2. Write as TFRecord. > > data.apply(TFRecordIO.write().to(String.format(PATH_TO_GCS_TFRECORD_UNCOMP, > gcsSubDir)).withNumShards(1)); > // 3. Write as TFRecord-gzip. > data.apply(TFRecordIO.write().withCompression(Compression.GZIP) > .to(String.format(PATH_TO_GCS_TFRECORD_GZ, > gcsSubDir)).withNumShards(1)); > pipeline.run().waitUntilFinish(); > } > LOG.info("-------------------------------------------"); > LOG.info(" READ TEST BEGINS "); > LOG.info("-------------------------------------------"); > // Read PCollection<TestProto> in 3 different ways from GCS. > { > Pipeline pipeline = Pipeline.create(options); > // 1. Read as plain-text. > > pipeline.apply(TextIO.read().from(String.format(PATH_TO_GCS_PLAIN_BASE64, > gcsSubDir) + "*")) > .apply(ParDo.of(new DoFn<String, byte[]>() { > @ProcessElement > public void processElement(ProcessContext c) { > c.output(Base64.decodeBase64(c.element())); > } > })).apply("plain-base64", ParDo.of(new CountDoFn("plain_base64"))); > // 2. Read as TFRecord -> byte array. > > pipeline.apply(TFRecordIO.read().from(String.format(PATH_TO_GCS_TFRECORD_UNCOMP, > gcsSubDir) + "*")) > .apply("tfrecord-uncomp", ParDo.of(new > CountDoFn("tfrecord_uncomp"))); > // 3. Read as TFRecord-gz -> byte array. > // This seems to fail when 'data size' becomes large. > pipeline > .apply(TFRecordIO.read().withCompression(Compression.GZIP) > .from(String.format(PATH_TO_GCS_TFRECORD_GZ, gcsSubDir) + "*")) > .apply("tfrecord_gz", ParDo.of(new CountDoFn("tfrecord_gz"))); > // 4. Run pipeline. > PipelineResult res = pipeline.run(); > res.waitUntilFinish(); > // Check CountDoFn's metrics. > // The numbers should match. > Map<String, Long> counterValues = new TreeMap<String, Long>(); > for (MetricResult<Long> counter : > res.metrics().queryMetrics(MetricsFilter.builder().build()).counters()) { > counterValues.put(counter.name().name(), counter.committed()); > } > StringBuffer sb = new StringBuffer(); > sb.append("\n------------ counter metrics from CountDoFn\n"); > for (Entry<String, Long> entry : counterValues.entrySet()) { > sb.append(String.format("[counter] %40s: %5d\n", entry.getKey(), > entry.getValue())); > } > LOG.info(sb.toString()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)