Hi Rico,

I already opened two tickets: BEAM-1395
<https://issues.apache.org/jira/browse/BEAM-1395> and BEAM-1396
<https://issues.apache.org/jira/browse/BEAM-1396> (Spark runner and SDK),
and you can follow on the PRs as well if you'd like:

https://github.com/apache/beam/pull/1922
https://github.com/apache/beam/pull/1924

Thanks!

On Mon, Feb 6, 2017 at 1:28 PM Bergmann, Rico (GfK External) <
[email protected]> wrote:

> https://issues.apache.org/jira/browse/BEAM-1403
>
>
>
> *Von:* Bergmann, Rico (GfK External) [mailto:[email protected]]
> *Gesendet:* Montag, 6. Februar 2017 12:19
> *An:* [email protected]
> *Betreff:* AW: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> See below
>
>
>
> *Von:* Amit Sela [mailto:[email protected] <[email protected]>]
> *Gesendet:* Freitag, 3. Februar 2017 15:31
> *An:* [email protected]
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> OK, this is indeed a different stacktrace - the problem now is in
> SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix
> any issues you've encountered.
>
>
>
> More questions: is your data timestamped ?
>
> Yes (but only internally)
>
> is your pipeline aware of the timestamp fields (using
> DoFn#outputWithTimestamp or a source that defines the timestamp)?
>
> No, we do not expose timestamps to the pipeline. For the pipeline this are
> simply fields in a record.
>
>
>
> Looks like this
> <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138>
>  is
> broken anyway, I don't think there's actually time-order guarantee when
> processing a partition. Could you open a ticket please ? Thanks!
>
>
>
> I’ll do this!
>
>
>
> On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <
> [email protected]> wrote:
>
>
>
> Due to restricitions in my contract I can not show you the pipeline. But
> it’s a very complex we are work on for several months already. Also with
> Beam 0.4.0
>
>
>
> Interesting to note is, that we already successfully ran our pipeline with
> that version. Now in a series of 30 executions about 20 get this exception,
> the others succeed…
>
>
>
>
>
> The full StackTrace
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
>
>         at
> org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>         at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> *Von:* Amit Sela [mailto:[email protected]]
> *Gesendet:* Freitag, 3. Februar 2017 13:10
>
>
> *An:* [email protected]
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Is it the exact same stack trace ? Would you mind sharing the stack trace
> and the pipeline ?
>
> Thanks,
> Amit
>
>
>
> On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <
> [email protected]> wrote:
>
> Hi!
>
>
>
> Thanks for the insights.
>
>
>
> As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran
> into the same error … L
>
>
>
> Any further ideas or suggestions?
>
>
>
> Best,
>
> Rico.
>
>
>
> *Von:* Amit Sela [mailto:[email protected]]
> *Gesendet:* Donnerstag, 2. Februar 2017 17:15
> *An:* [email protected]
> *Betreff:* Re: possible reasons for exception "Cannot move input
> watermark time backwards from"
>
>
>
> Hi Rico,
>
>
>
> Batch sort of uses Watermarks by noting the "start watermark"  at the
> beginning of time, and the "end watermark" at the end of time (this is the
> "overflow" you see), or in a more "Beam" way, the watermark at the
> beginning is the start of time, and after processing all the elements the
> watermark jumps to the end of time because we *know *there are no more
> elements left to process.
>
>
>
> Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around
> that area in the Spark runner, and release is on the way so 0.5.0 should be
> available within a few days anyway.
>
>
>
> Thanks,
>
> Amit
>
>
>
> On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <
> [email protected]> wrote:
>
> Hi @all!
>
>
>
> I’m using Beam 0.4.0 and only the batch processing features of it.
>
> While executing the pipeline I get an exception: Cannot move input
> watermark time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
> First, since I’m not using the streaming features I’m wondering about
> watermarks (but this may be an Beam internal thing, I don’t know).
>
> Second, the timestamp stated in the exception message is really weird and
> looks a bit like an overflow in a long value to me.
>
>
>
> Does anyone have a clue what the reason for this exception could be?
>
>
>
> Thanks,
>
> Rico.
>
>
>
>
>
> Full Stacktrace:
>
> 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster
> (Logging.scala:logError(95)) - User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: Cannot move input watermark time backwards
> from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>
>         at
> com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
> Caused by: java.lang.IllegalStateException: Cannot move input watermark
> time backwards from 294247-01-09T04:00:54.775Z to
> -290308-12-21T19:59:05.225Z
>
>         at
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>
>         at
> org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
>
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
>
>         at
> com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>
>         at
> com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>
>         at
> org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
>
>         at
> org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf
> Klein-Bölting This email and any attachments may contain confidential or
> privileged information. Please note that unauthorized copying, disclosure
> or distribution of the material in this email is not permitted.
>

Reply via email to