Eugene Kirpichov created BEAM-2753:
--------------------------------------

             Summary: File DynamicDestinations side inputs don't work with 
sharding
                 Key: BEAM-2753
                 URL: https://issues.apache.org/jira/browse/BEAM-2753
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-core
            Reporter: Eugene Kirpichov
            Assignee: Reuven Lax


WriteWithShardingFactory uses PTransformReplacements.getSingletonMaininput 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java#L74

However if the dynamic destinations have a side input, then the transform has 
more than 1 input and the function fails:

Exception in thread "main" java.lang.IllegalArgumentException: Got multiple 
inputs that are not additional inputs for a singleton main input: Avro schema 
side input/ParMultiDo(Anonymous).out0 [PCollection] and Run read all/Execute 
queries/ParMultiDo(NaiveSpannerRead).out0 [PCollection]
        at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.java.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:383)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformReplacements.getSingletonMainInput(PTransformReplacements.java:50)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformReplacements.getSingletonMainInput(PTransformReplacements.java:41)
        at 
org.apache.beam.runners.direct.WriteWithShardingFactory.getReplacementTransform(WriteWithShardingFactory.java:74)
        at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:540)
        at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:280)
        at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:201)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:169)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)

This is not caught by unit tests because unit tests specify withoutSharding().
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java#L644

CC: [~mkhadikov]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to