[ https://issues.apache.org/jira/browse/BEAM-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dave Martin updated BEAM-10100: ------------------------------- Description: FileIO writeDynamic with AvroIO.sink is not writing all data in the following pipeline. The amount of data written varies between runs but it is consistently dropping records. This is with a very small test dataset - 6 records, which should produce 3 directories. {code:java} Pipeline p = Pipeline.create(options); PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv")) .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn())); //write out into AVRO in each separate directory records.apply("Write avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC))) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(key -> defaultNaming(key + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION))); p.run().waitUntilFinish(); {code} If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories. This is working consistently. e.g. {code:java} // Initialise pipeline Pipeline p = Pipeline.create(options); PCollection<KV<String, String>> records = p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new StringToDatasetIDKVFcn())); //write out into AVRO in each separate directory records.apply("Write CSV file per dataset", FileIO.<String, KV<String, String>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), TextIO.sink()) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(datasetID -> defaultNaming(key + "/export", ".csv")); p.run().waitUntilFinish(); {code} cc [~timrobertson100] was: FileIO writeDynamic with AvroIO.sink is not writing all data in the following pipeline. The amount of data written varies between runs but it is consistently dropping. This is with a very small test dataset - 5 records, which should produce 3 directories. {code:java} Pipeline p = Pipeline.create(options); PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv")) .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn())); //write out into AVRO in each separate directory records.apply("Write avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC))) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(key -> defaultNaming(key + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION))); p.run().waitUntilFinish(); {code} If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories. This is working consistently. e.g. {code:java} // Initialise pipeline Pipeline p = Pipeline.create(options); PCollection<KV<String, String>> records = p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new StringToDatasetIDKVFcn())); //write out into AVRO in each separate directory records.apply("Write CSV file per dataset", FileIO.<String, KV<String, String>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), TextIO.sink()) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(datasetID -> defaultNaming(key + "/export", ".csv")); p.run().waitUntilFinish(); {code} cc [~timrobertson100] > FileIO writeDynamic with AvroIO.sink not writing all data > --------------------------------------------------------- > > Key: BEAM-10100 > URL: https://issues.apache.org/jira/browse/BEAM-10100 > Project: Beam > Issue Type: Bug > Components: beam-community > Affects Versions: 2.17.0, 2.20.0 > Environment: Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5. > Reporter: Dave Martin > Assignee: Aizhamal Nurmamat kyzy > Priority: P2 > Labels: AVRO, IO, Spark > > FileIO writeDynamic with AvroIO.sink is not writing all data in the following > pipeline. The amount of data written varies between runs but it is > consistently dropping records. This is with a very small test dataset - 6 > records, which should produce 3 directories. > {code:java} > Pipeline p = Pipeline.create(options); > PCollection<KV<String, AvroRecord>> records = > p.apply(TextIO.read().from("/tmp/input.csv")) > .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn())); > //write out into AVRO in each separate directory > records.apply("Write avro file per dataset", FileIO.<String, KV<String, > AvroRecord>>writeDynamic() > .by(KV::getKey) > .via(Contextful.fn(KV::getValue), Contextful.fn(x -> > AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC))) > .to(options.getTargetPath()) > .withDestinationCoder(StringUtf8Coder.of()) > .withNaming(key -> defaultNaming(key + "/export", > PipelinesVariables.Pipeline.AVRO_EXTENSION))); > p.run().waitUntilFinish(); > {code} > If i replace AvroIO.sink() with TextIO.sink() (and replace the initial > mapping function) then the correct number of records are written to the > separate directories. This is working consistently. > e.g. > {code:java} > // Initialise pipeline > Pipeline p = Pipeline.create(options); > PCollection<KV<String, String>> records = > p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new > StringToDatasetIDKVFcn())); > //write out into AVRO in each separate directory > records.apply("Write CSV file per dataset", FileIO.<String, KV<String, > String>>writeDynamic() > .by(KV::getKey) > .via(Contextful.fn(KV::getValue), TextIO.sink()) > .to(options.getTargetPath()) > .withDestinationCoder(StringUtf8Coder.of()) > .withNaming(datasetID -> defaultNaming(key + "/export", ".csv")); > p.run().waitUntilFinish(); > {code} > cc [~timrobertson100] -- This message was sent by Atlassian Jira (v8.3.4#803005)