Hi Arvid,

If ingestion time programs cannot handle late data then why would it
generate watermarks? Isn't the whole point of watermarks is to handle the
late data?

My last question was more about this library
<https://github.com/vasia/gelly-streaming> I run several algorithms using
SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into the
following error whenever I invoke the following constructor
<https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L69>
.
But it works if I change it to this
<https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L86>
so
I am not exactly sure what is happening there.

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0c61d6ef0483c3068076a988bc252a74)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0c61d6ef0483c3068076a988bc252a74)

at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:86)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.

at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
(= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?

at
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)

at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

On Tue, Mar 10, 2020 at 1:40 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Kant,
>
> I just saw that asked the same question on SO [1]. Could you, in the
> future, please cross-reference these posts, so that we don't waste
> resources on answering?
>
> [1]
> https://stackoverflow.com/questions/60610985/do-i-need-to-set-assigntimestampsandwatermarks-if-i-set-my-time-characteristic-t
>
> On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Kant,
>>
>> according to the documentation [1], you don't need to set a watermark
>> assigner:
>>
>>> Compared to *event time*, *ingestion time* programs cannot handle any
>>> out-of-order events or late data, but the programs don’t have to specify
>>> how to generate *watermarks*.
>>>
>>> Internally, *ingestion time* is treated much like *event time*, but
>>> with automatic timestamp assignment and automatic watermark generation.
>>>
>>
>> So it's neither possible to assign timestamps nor watermark, but it seems
>> as if the default behavior is exactly as you want it to be. If that doesn't
>> work for you, could you please rephrase your last question or describe your
>> use case? I didn't get it.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
>>
>> On Tue, Mar 10, 2020 at 5:01 AM kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Do I need to set assignTimestampsAndWatermarks if I set my time
>>> characteristic to IngestionTime?
>>>
>>> say I set my time characteristic of stream execution environment to
>>> Ingestion time as follows
>>>
>>>
>>> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>
>>> do I need to call
>>> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>>>
>>> I thought datastream.assignTimestampsAndWatermarks is mandatory only if
>>> time characteristic is event time. No? Did this behavior change in Flink
>>> 1.10? because I see libraries not setting
>>> datastream.assignTimestampsAndWatermarks when time characteristic is
>>> Ingestion time but they do for event time. If not, I am wondering how can I
>>> set AscendingTimestampExtractor in a distributed environment? is there
>>> anyway to add monotonically increasing long(AscendingTimestampExtractor)
>>> without any distributed locks?
>>>
>>> Thanks!
>>>
>>

Reply via email to