Hi Rico, Batch sort of uses Watermarks by noting the "start watermark" at the beginning of time, and the "end watermark" at the end of time (this is the "overflow" you see), or in a more "Beam" way, the watermark at the beginning is the start of time, and after processing all the elements the watermark jumps to the end of time because we *know *there are no more elements left to process.
Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that area in the Spark runner, and release is on the way so 0.5.0 should be available within a few days anyway. Thanks, Amit On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) < [email protected]> wrote: > Hi @all! > > > > I’m using Beam 0.4.0 and only the batch processing features of it. > > While executing the pipeline I get an exception: Cannot move input > watermark time backwards from 294247-01-09T04:00:54.775Z to > -290308-12-21T19:59:05.225Z > > First, since I’m not using the streaming features I’m wondering about > watermarks (but this may be an Beam internal thing, I don’t know). > > Second, the timestamp stated in the exception message is really weird and > looks a bit like an overflow in a long value to me. > > > > Does anyone have a clue what the reason for this exception could be? > > > > Thanks, > > Rico. > > > > > > Full Stacktrace: > > 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster > (Logging.scala:logError(95)) - User class threw exception: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: Cannot move input watermark time backwards > from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z > > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: Cannot move input watermark time backwards > from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z > > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72) > > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112) > > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101) > > at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110) > > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52) > > at > com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:497) > > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) > > Caused by: java.lang.IllegalStateException: Cannot move input watermark > time backwards from 294247-01-09T04:00:54.775Z to > -290308-12-21T19:59:05.225Z > > at > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) > > at > org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163) > > at > org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89) > > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372) > > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335) > > at > com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) > > at > com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) > > at > org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91) > > at > org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75) > > at > org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) > > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > > at org.apache.spark.scheduler.Task.run(Task.scala:89) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > ------------------------------ > > > GfK SE, Nuremberg, Germany, commercial register at the local court > Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard > Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), > David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf > Klein-Bölting This email and any attachments may contain confidential or > privileged information. Please note that unauthorized copying, disclosure > or distribution of the material in this email is not permitted. >
