@Rico: filed BEAM-1395 <https://issues.apache.org/jira/browse/BEAM-1395>. This should be sorted soon enough.
Thanks for reporting this issue! On Fri, Feb 3, 2017 at 4:31 PM Amit Sela <[email protected]> wrote: > OK, this is indeed a different stacktrace - the problem now is in > SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix > any issues you've encountered. > > More questions: is your data timestamped ? is your pipeline aware of the > timestamp fields (using DoFn#outputWithTimestamp or a source that defines > the timestamp)? > > Looks like this > <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138> > is > broken anyway, I don't think there's actually time-order guarantee when > processing a partition. Could you open a ticket please ? Thanks! > > On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) < > [email protected]> wrote: > > > > Due to restricitions in my contract I can not show you the pipeline. But > it’s a very complex we are work on for several months already. Also with > Beam 0.4.0 > > > > Interesting to note is, that we already successfully ran our pipeline with > that version. Now in a series of 30 executions about 20 get this exception, > the others succeed… > > > > > > The full StackTrace > > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: Cannot move input watermark time backwards > from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z > > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72) > > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112) > > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101) > > at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110) > > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:497) > > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) > > Caused by: java.lang.IllegalStateException: Cannot move input watermark > time backwards from 294247-01-09T04:00:54.775Z to > -290308-12-21T19:59:05.225Z > > at > org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) > > at > org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189) > > at > org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140) > > at > org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) > > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) > > at > org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64) > > at > org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97) > > at > org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > > at org.apache.spark.scheduler.Task.run(Task.scala:89) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > > > *Von:* Amit Sela [mailto:[email protected]] > *Gesendet:* Freitag, 3. Februar 2017 13:10 > > > *An:* [email protected] > *Betreff:* Re: possible reasons for exception "Cannot move input > watermark time backwards from" > > > > Is it the exact same stack trace ? Would you mind sharing the stack trace > and the pipeline ? > > Thanks, > Amit > > > > On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) < > [email protected]> wrote: > > Hi! > > > > Thanks for the insights. > > > > As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran > into the same error … L > > > > Any further ideas or suggestions? > > > > Best, > > Rico. > > > > *Von:* Amit Sela [mailto:[email protected]] > *Gesendet:* Donnerstag, 2. Februar 2017 17:15 > *An:* [email protected] > *Betreff:* Re: possible reasons for exception "Cannot move input > watermark time backwards from" > > > > Hi Rico, > > > > Batch sort of uses Watermarks by noting the "start watermark" at the > beginning of time, and the "end watermark" at the end of time (this is the > "overflow" you see), or in a more "Beam" way, the watermark at the > beginning is the start of time, and after processing all the elements the > watermark jumps to the end of time because we *know *there are no more > elements left to process. > > > > Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around > that area in the Spark runner, and release is on the way so 0.5.0 should be > available within a few days anyway. > > > > Thanks, > > Amit > > > > On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) < > [email protected]> wrote: > > Hi @all! > > > > I’m using Beam 0.4.0 and only the batch processing features of it. > > While executing the pipeline I get an exception: Cannot move input > watermark time backwards from 294247-01-09T04:00:54.775Z to > -290308-12-21T19:59:05.225Z > > First, since I’m not using the streaming features I’m wondering about > watermarks (but this may be an Beam internal thing, I don’t know). > > Second, the timestamp stated in the exception message is really weird and > looks a bit like an overflow in a long value to me. > > > > Does anyone have a clue what the reason for this exception could be? > > > > Thanks, > > Rico. > > > > > > Full Stacktrace: > > 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster > (Logging.scala:logError(95)) - User class threw exception: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: Cannot move input watermark time backwards > from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z > > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: Cannot move input watermark time backwards > from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z > > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72) > > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112) > > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101) > > at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110) > > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:497) > > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) > > Caused by: java.lang.IllegalStateException: Cannot move input watermark > time backwards from 294247-01-09T04:00:54.775Z to > -290308-12-21T19:59:05.225Z > > at > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) > > at > org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163) > > at > org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89) > > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372) > > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335) > > at > com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) > > at > com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) > > at > org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91) > > at > org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75) > > at > org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > > at org.apache.spark.scheduler.Task.run(Task.scala:89) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > > ------------------------------ > > > > GfK SE, Nuremberg, Germany, commercial register at the local court > Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard > Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), > David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf > Klein-Bölting This email and any attachments may contain confidential or > privileged information. Please note that unauthorized copying, disclosure > or distribution of the material in this email is not permitted. > > > ------------------------------ > > > > GfK SE, Nuremberg, Germany, commercial register at the local court > Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard > Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), > David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf > Klein-Bölting This email and any attachments may contain confidential or > privileged information. Please note that unauthorized copying, disclosure > or distribution of the material in this email is not permitted. > > > ------------------------------ > > > GfK SE, Nuremberg, Germany, commercial register at the local court > Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard > Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), > David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf > Klein-Bölting This email and any attachments may contain confidential or > privileged information. Please note that unauthorized copying, disclosure > or distribution of the material in this email is not permitted. > >
