[ https://issues.apache.org/jira/browse/BEAM-6188?focusedWorklogId=177994&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177994 ]
ASF GitHub Bot logged work on BEAM-6188: ---------------------------------------- Author: ASF GitHub Bot Created on: 21/Dec/18 14:56 Start Date: 21/Dec/18 14:56 Worklog Time Spent: 10m Work Description: lgajowy commented on issue #7226: [BEAM-6188] Unbouded synthetic source URL: https://github.com/apache/beam/pull/7226#issuecomment-449409277 FYI: I modified the watermark behavior. This is required to be able to stop the streaming pipeline after all records are processed. This is similar to what Nexmark `UnboundedEventSource` does and IMO it makes perfect sense here too. Such approach doesn't work on Spark runner yet and this is the reason why Nexmark streaming suites are not run on Jenkins. Unfortunately, I get errors on Dataflow after providing this change. I decided to push this new commit because the error seems to be worker or coder related rather than watermark related: ``` Caused by: java.io.EOFException: reached end of stream after reading 17 bytes; 571284 bytes expected org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.readFully(ByteStreams.java:740) org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.readFully(ByteStreams.java:722) org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:108) org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:118) org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:81) org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107) org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750) org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) java.lang.RuntimeException: java.io.IOException: varint overflow 21432854729 org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:110) org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750) org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: varint overflow 21432854729 org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:58) org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:103) org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:118) org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:81) org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107) org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750) org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation ``` Do you have any ideas on what may be going on? I will continue investigating this after 2nd January 2019 since I'm taking some days off. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 177994) Time Spent: 3h 10m (was: 3h) > Create unbounded synthetic source > --------------------------------- > > Key: BEAM-6188 > URL: https://issues.apache.org/jira/browse/BEAM-6188 > Project: Beam > Issue Type: Sub-task > Components: testing > Reporter: Lukasz Gajowy > Assignee: Lukasz Gajowy > Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > It is needed for streaming scenarios. It should provide ways to reason about > time and recovering from checkpoints. -- This message was sent by Atlassian JIRA (v7.6.3#76005)