Dave Martin created BEAM-10100: ---------------------------------- Summary: FileIO writeDynamic with AvroIO.sink not writing all data Key: BEAM-10100 URL: https://issues.apache.org/jira/browse/BEAM-10100 Project: Beam Issue Type: Bug Components: beam-community Affects Versions: 2.20.0, 2.17.0 Environment: Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5. Reporter: Dave Martin Assignee: Aizhamal Nurmamat kyzy
{code:java} Pipeline p = Pipeline.create(options); PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv")) .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn())); //write out into AVRO in each separate directory records.apply("Write avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC))) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(key -> defaultNaming(key + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION))); p.run().waitUntilFinish(); {code} If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories. e.g. {code:java} // Initialise pipeline Pipeline p = Pipeline.create(options); PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv")) .apply(ParDo.of(new StringToDatasetIDKVFcn())); //write out into AVRO in each separate directory records.apply("Write CSV file per dataset", FileIO.<String, KV<String, String>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), TextIO.sink()) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(datasetID -> defaultNaming(key + "/export", ".csv")); p.run().waitUntilFinish(); {code} cc [~timrobertson100] -- This message was sent by Atlassian Jira (v8.3.4#803005)