shunping opened a new issue, #36387:
URL: https://github.com/apache/beam/issues/36387

   ### What happened?
   
   Two of the GroupIntoBatches tests are excluded in the prism Java VR test 
suite.
   
   - 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode'
   - 
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow'
   
   The stacktrace is shown below:
   ```
   bundle inst009 stage-001 failed:org.apache.beam.sdk.coders.CoderException: 
java.io.EOFException: reached end of stream after reading 7 bytes; 69 bytes 
expected
           at 
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:104)
           at 
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
           at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84)
           at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
           at 
org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:847)
           at 
org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:838)
           at 
org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:784)
           at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:232)
           at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:186)
           at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:542)
           at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
           at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
           at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
           at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
           at 
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           at java.base/java.lang.Thread.run(Thread.java:833)
       Caused by: java.io.EOFException: reached end of stream after reading 7 
bytes; 69 bytes expected
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:802)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:784)
           at 
org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:60)
           at 
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
           ... 17 more
           at 
org.apache.beam.runners.prism.TestPrismRunner.run(TestPrismRunner.java:72)
           at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
           at 
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
           at 
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
           at 
org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode(GroupIntoBatchesTest.java:438)
   ```
   
   After investigation, I found that it seems to be a stage fusion problem in 
prism.
   
   The input pcollection (after windowing) is sent to two downstream branches:
   - a DoFn that returns nothing
   
https://github.com/apache/beam/blob/5485467f230100e7ac2c0b50bda72b5e38ed9826/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java#L374
   - the GroupIntoBatches transform
   - 
https://github.com/apache/beam/blob/5485467f230100e7ac2c0b50bda72b5e38ed9826/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java#L388
   
   However, in the graph after stage fusion, the DoFn and the windowing are 
fused in one stage and the windowing output becomes the output of this stage. 
   
   ```
   G stage-001: 
     * environment: beam:env:external:v1
     S transforms : 2 []string
         0: Window.Into()/Window.Assign
         1: ParDo(Anonymous)/ParMultiDo(Anonymous)
   
   ...
   G stage-008: 
     * environment: beam:env:external:v1
     S transforms : 6 []string
         0: 
GroupIntoBatches/ParDo(GroupIntoBatches)/ParMultiDo(GroupIntoBatches)
         1: eCount-elements-in-windows-after-applying-GroupIntoBatches_lift
         2: PAssert$1/GroupGlobally/Window.Into()/Window.Assign
         3: 
PAssert$1/GroupGlobally/GatherAllOutputs/Reify.Window/ParDo(Anonymous)/ParMultiDo(Anonymous)
         4: 
PAssert$1/GroupGlobally/GatherAllOutputs/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
         5: PAssert$1/GroupGlobally/GatherAllOutputs/Window.Into()/Window.Assign
   
   ...
   
    DEBUG  em.AddStage
     ID     : stage-001
   S inputs : 1 []string
       0: TestStream.out
   S sides  : 0 []engine.LinkID
   S outputs: 1 []string
       0: Window.Into()/Window.Assign.out
   ...
    DEBUG  em.AddStage
     ID     : stage-008
   S inputs : 1 []string
       0: Window.Into()/Window.Assign.out
   S sides  : 0 []engine.LinkID
   S outputs: 2 []string
       0: 
PAssert$1/GroupGlobally/GatherAllOutputs/Window.Into()/Window.Assign.out
       1: nCount-elements-in-windows-after-applying-GroupIntoBatches_lifted
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to