Dave Martin created BEAM-10100:
----------------------------------

             Summary: FileIO writeDynamic with AvroIO.sink not writing all data
                 Key: BEAM-10100
                 URL: https://issues.apache.org/jira/browse/BEAM-10100
             Project: Beam
          Issue Type: Bug
          Components: beam-community
    Affects Versions: 2.20.0, 2.17.0
         Environment: Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5.
            Reporter: Dave Martin
            Assignee: Aizhamal Nurmamat kyzy


{code:java}
Pipeline p = Pipeline.create(options);
PCollection<KV<String, AvroRecord>> records = 
p.apply(TextIO.read().from("/tmp/input.csv"))
.apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));

//write out into AVRO in each separate directory
records.apply("Write avro file per dataset", FileIO.<String, KV<String, 
AvroRecord>>writeDynamic()
  .by(KV::getKey)
  .via(Contextful.fn(KV::getValue), Contextful.fn(x -> 
AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
  .to(options.getTargetPath())
  .withDestinationCoder(StringUtf8Coder.of())
  .withNaming(key -> defaultNaming(key + "/export", 
PipelinesVariables.Pipeline.AVRO_EXTENSION)));

p.run().waitUntilFinish();
{code}


If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping 
function) then the correct number of records are written to the separate 
directories.

e.g.

{code:java}
// Initialise pipeline
Pipeline p = Pipeline.create(options);

PCollection<KV<String, AvroRecord>> records = 
p.apply(TextIO.read().from("/tmp/input.csv"))
                                                                                
.apply(ParDo.of(new StringToDatasetIDKVFcn()));
//write out into AVRO in each separate directory
records.apply("Write CSV file per dataset", FileIO.<String, KV<String, 
String>>writeDynamic()
    .by(KV::getKey)
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to(options.getTargetPath())
    .withDestinationCoder(StringUtf8Coder.of())
    .withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));

 p.run().waitUntilFinish();
{code}

cc [~timrobertson100]





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to