[ https://issues.apache.org/jira/browse/BEAM-6480?focusedWorklogId=272629&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-272629 ]
ASF GitHub Bot logged work on BEAM-6480: ---------------------------------------- Author: ASF GitHub Bot Created on: 05/Jul/19 14:39 Start Date: 05/Jul/19 14:39 Worklog Time Spent: 10m Work Description: RyanSkraba commented on pull request #9005: [BEAM-6480] Adds AvroIO sink for generic records. URL: https://github.com/apache/beam/pull/9005#discussion_r300709349 ########## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ########## @@ -1117,6 +1122,68 @@ private void testDynamicDestinationsUnwindowedWithSharding( } case AVROIO_SINK: + { + FileIO.Write<String, IndexedRecord> write = + FileIO.<String, IndexedRecord>writeDynamic() + .by( + fn( + (element, c) -> { + c.sideInput(schemaView); // Ignore result + return element.getSchema().getName().substring(0, 1); + }, + requiresSideInputs(schemaView))) + .via( + fn( + (dest, c) -> { + Schema schema = + new Schema.Parser().parse(c.sideInput(schemaView).get(dest)); + return AvroIO.sinkViaGeneric(schema); + }, + requiresSideInputs(schemaView))) + .to(baseDir.toString()) + .withNaming( + fn( + (dest, c) -> { + c.sideInput(schemaView); // Ignore result + return FileIO.Write.defaultNaming("file_" + dest, ".avro"); + }, + requiresSideInputs(schemaView))) + .withTempDirectory(baseDir.toString()) + .withDestinationCoder(StringUtf8Coder.of()) + .withIgnoreWindowing(); + switch (sharding) { + case RUNNER_DETERMINED: + break; + case WITHOUT_SHARDING: + write = write.withNumShards(1); + break; + case FIXED_3_SHARDS: + write = write.withNumShards(3); + break; + default: + throw new IllegalArgumentException("Unknown sharding " + sharding); + } + + MapElements<String, IndexedRecord> formatter = Review comment: `toRecord` OK? I'd like to change the MapElements to return a GenericRecord to prove that we don't need to cast the Write/Sink in order to use it with a `PCollection<GenericRecord>` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 272629) Time Spent: 1h 50m (was: 1h 40m) > Add AvroIO.sink for IndexedRecord (FileIO compatible) > ----------------------------------------------------- > > Key: BEAM-6480 > URL: https://issues.apache.org/jira/browse/BEAM-6480 > Project: Beam > Issue Type: New Feature > Components: io-java-avro > Affects Versions: 2.9.0 > Reporter: Romain Manni-Bucau > Assignee: Ryan Skraba > Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > More generally for sink there is no need to create a mapper API since the > previous PTransform can always map in a format the sink support so any sink > can assume the format is right. -- This message was sent by Atlassian JIRA (v7.6.3#76005)