Watermarks are a tool for handling out-of-orderness when working with event
time timestamps. They provide a mechanism for managing the tradeoff between
latency and completeness, allowing you to manage how long to wait for any
out-of-orderness to resolve itself. Note the way that Flink uses these
terms, out-of-orderness is not the same as lateness: your watermarking will
accommodate a certain amount of out-of-orderness, and out-of-order events
that arrive within this timeframe are not considered late. Only events that
are excessively out-of-order -- i.e., with timestamps behind the current
watermark -- are late.

I would say that the documentation you quoted is a bit misleading, since
with ingestion time processing there can be no late events.

Most of the Flink runtime only makes a distinction between processing time
and event time. For example, there are processing time timers (triggered by
the system clock) and event time timers (triggered by watermarks), but
there's no such thing as an ingestion time timer. Ingestion time is a
hybrid between the two that assigns timestamps and watermarks based on
processing time, and then the rest of the pipeline behaves as though you
were doing event time processing.

This means that when working with ingestion time you lose most of the
benefits of event time processing, such as deterministic, reproducible
behavior. But using ingestion time does make it possible to use certain
parts of the APIs that are described as "event time only", such as interval
joins.

I don't know enough about streaming-gelly to speculate about what's going
on there.

David



On Tue, Mar 10, 2020 at 10:14 AM kant kodali <kanth...@gmail.com> wrote:

> Hi Arvid,
>
> If ingestion time programs cannot handle late data then why would it
> generate watermarks? Isn't the whole point of watermarks is to handle the
> late data?
>
> My last question was more about this library
> <https://github.com/vasia/gelly-streaming> I run several algorithms using
> SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into
> the following error whenever I invoke the following constructor
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L69>
>  .
> But it works if I change it to this
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L86>
>  so
> I am not exactly sure what is happening there.
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
> at Test.main(Test.java:86)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
> ... 8 more
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>
> ... 19 more
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
> (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>
> at
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
>
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> On Tue, Mar 10, 2020 at 1:40 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Kant,
>>
>> I just saw that asked the same question on SO [1]. Could you, in the
>> future, please cross-reference these posts, so that we don't waste
>> resources on answering?
>>
>> [1]
>> https://stackoverflow.com/questions/60610985/do-i-need-to-set-assigntimestampsandwatermarks-if-i-set-my-time-characteristic-t
>>
>> On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Kant,
>>>
>>> according to the documentation [1], you don't need to set a watermark
>>> assigner:
>>>
>>>> Compared to *event time*, *ingestion time* programs cannot handle any
>>>> out-of-order events or late data, but the programs don’t have to specify
>>>> how to generate *watermarks*.
>>>>
>>>> Internally, *ingestion time* is treated much like *event time*, but
>>>> with automatic timestamp assignment and automatic watermark generation.
>>>>
>>>
>>> So it's neither possible to assign timestamps nor watermark, but it
>>> seems as if the default behavior is exactly as you want it to be. If that
>>> doesn't work for you, could you please rephrase your last question or
>>> describe your use case? I didn't get it.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
>>>
>>> On Tue, Mar 10, 2020 at 5:01 AM kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Do I need to set assignTimestampsAndWatermarks if I set my time
>>>> characteristic to IngestionTime?
>>>>
>>>> say I set my time characteristic of stream execution environment to
>>>> Ingestion time as follows
>>>>
>>>>
>>>> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>>
>>>> do I need to call
>>>> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>>>>
>>>> I thought datastream.assignTimestampsAndWatermarks is mandatory only
>>>> if time characteristic is event time. No? Did this behavior change in Flink
>>>> 1.10? because I see libraries not setting
>>>> datastream.assignTimestampsAndWatermarks when time characteristic is
>>>> Ingestion time but they do for event time. If not, I am wondering how can I
>>>> set AscendingTimestampExtractor in a distributed environment? is there
>>>> anyway to add monotonically increasing long(AscendingTimestampExtractor)
>>>> without any distributed locks?
>>>>
>>>> Thanks!
>>>>
>>>

Reply via email to