[ 
https://issues.apache.org/jira/browse/BEAM-6188?focusedWorklogId=177995&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177995
 ]

ASF GitHub Bot logged work on BEAM-6188:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Dec/18 14:57
            Start Date: 21/Dec/18 14:57
    Worklog Time Spent: 10m 
      Work Description: lgajowy edited a comment on issue #7226: [BEAM-6188] 
Unbouded synthetic source
URL: https://github.com/apache/beam/pull/7226#issuecomment-449409277
 
 
   FYI: I modified the watermark behavior. This is required to be able to stop 
the streaming pipeline after all records are processed. This is similar to what 
Nexmark `UnboundedEventSource` does and IMO it makes perfect sense here too. 
   
   Such approach doesn't work on Spark runner yet and this is the reason why 
Nexmark streaming suites are not run on Jenkins.
   
   Unfortunately, I get errors on Dataflow after providing this change. I 
decided to push this new commit because the error seems to be worker or coder 
related rather than watermark related and it happens on Dataflow runner only (a 
bug?): 
   
   ```
   Caused by: java.io.EOFException: reached end of stream after reading 17 
bytes; 571284 bytes expected
           
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.readFully(ByteStreams.java:740)
           
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.readFully(ByteStreams.java:722)
           
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:108)
           
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
           org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
           org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76)
           org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
           
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:118)
           
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:81)
           
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
           
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
           
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
           
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56)
           
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
           
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
           
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
           java.lang.Thread.run(Thread.java:745)
   java.lang.RuntimeException: java.io.IOException: varint overflow 21432854729
           
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:110)
           
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
           
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
           
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56)
           
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
           
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
           
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
           java.lang.Thread.run(Thread.java:745)
   Caused by: java.io.IOException: varint overflow 21432854729
           org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:58)
           
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:103)
           
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
           org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
           org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76)
           org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
           
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:118)
           
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:81)
           
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
           
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
           
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
           
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56)
           
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
           
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation
   ```
   
   Do you have any ideas on what may be going on? 
   
   I will continue investigating this after 2nd January 2019 since I'm taking 
some days off. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 177995)
    Time Spent: 3h 20m  (was: 3h 10m)

> Create unbounded synthetic source
> ---------------------------------
>
>                 Key: BEAM-6188
>                 URL: https://issues.apache.org/jira/browse/BEAM-6188
>             Project: Beam
>          Issue Type: Sub-task
>          Components: testing
>            Reporter: Lukasz Gajowy
>            Assignee: Lukasz Gajowy
>            Priority: Major
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> It is needed for streaming scenarios. It should provide ways to reason about 
> time and recovering from checkpoints. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to