Re: ProcessFunction example from the documentation giving me error

2018-07-19 Thread vino yang
Hi anna,

Can you share your program and the exception stack trace and more details
about what's your source and state backend?

>From the information you provided, it seems Flink started a network connect
but timed out.

Thanks, vino.

2018-07-20 14:14 GMT+08:00 anna stax :

> Hi all,
>
> I am new to Flink. I am using the classes CountWithTimestamp and
> CountWithTimeoutFunction from the examples found in
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/
> operators/process_function.html
>
> I am getting the error Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException:
> java.net.ConnectException: Operation timed out (Connection timed out)
>
> Looks like when timer’s time is reached I am getting this error. Any idea
> why. Please help
>
> Thanks
>


Re: Is KeyedProcessFunction available in Flink 1.4?

2018-07-19 Thread anna stax
Thanks Bowen.

On Thu, Jul 19, 2018 at 4:45 PM, Bowen Li  wrote:

> Hi Anna,
>
> KeyedProcessFunction is only available starting from Flink 1.5. The doc is
> here
> .
> It extends ProcessFunction and shares the same functionalities except
> giving more access to timers' key, thus you can refer to examples of
> ProcessFunction in that document.
>
> Bowen
>
>
> On Thu, Jul 19, 2018 at 3:26 PM anna stax  wrote:
>
>> Hello all,
>> I am using Flink 1.4 because thats the version provided by the latest AWS
>> EMR.
>> Is KeyedProcessFunction available in Flink 1.4?
>>
>> Also please share any links to good examples on using
>> KeyedProcessFunction .
>>
>> Thanks
>>
>


ProcessFunction example from the documentation giving me error

2018-07-19 Thread anna stax
Hi all,

I am new to Flink. I am using the classes CountWithTimestamp and
CountWithTimeoutFunction from the examples found in
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html

I am getting the error Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
java.net.ConnectException: Operation timed out (Connection timed out)

Looks like when timer’s time is reached I am getting this error. Any idea
why. Please help

Thanks


Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-19 Thread vino yang
Hi Till,

You are right, we also saw the problem you said. Curator removes the
specific job graph path asynchronously. But it's the only gist when
recovering, right? Is there any plan to enhance this point?

Thanks, vino.

2018-07-19 21:58 GMT+08:00 Till Rohrmann :

> Hi Gerard,
>
> the logging statement `Removed job graph ... from ZooKeeper` is actually
> not 100% accurate. The actual deletion is executed as an asynchronous
> background task and the log statement is not printed in the callback (which
> it should). Therefore, the deletion could still have failed. In order to
> see this, the full jobmanager/cluster entry point logs would be
> tremendously helpful.
>
> Cheers,
> Till
>
> On Thu, Jul 19, 2018 at 1:33 PM Gerard Garcia  wrote:
>
>> Thanks Andrey,
>>
>> That is the log from the jobmanager just after it has finished cancelling
>> the task:
>>
>> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Stopping
>> checkpoint coordinator for job e403893e5208ca47ace886a77e405291.
>> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
>> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting
>> down
>> 11:29:18.738 [flink-akka.actor.default-dispatcher-15695] INFO
>> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing
>> /flink-eur/default/checkpoints/e403893e5208ca47ace886a77e405291 from
>> ZooKeeper
>> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
>> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting
>> down.
>> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
>> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing
>> /checkpoint-counter/e403893e5208ca47ace886a77e405291 from ZooKeeper
>> 11:29:18.827 [flink-akka.actor.default-dispatcher-15695] INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
>> e403893e5208ca47ace886a77e405291 reached globally terminal state
>> CANCELED.
>> 11:29:18.846 [flink-akka.actor.default-dispatcher-15675] INFO
>> org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the JobMaster
>> for job (...)(e403893e5208ca47ace886a77e405291).
>> 11:29:18.848 [flink-akka.actor.default-dispatcher-15675] INFO
>> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
>> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
>> org.apache.flink.runtime.jobmaster.JobMaster  - Close ResourceManager
>> connection d5fbc30a895066054e29fb2fd60fb0f1: JobManager is shutting
>> down..
>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending
>> SlotPool.
>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping
>> SlotPool.
>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15688] INFO
>> o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Disconnect job manager 9cf221e2340597629fb932c03aa14...@akka.tcp://flink@
>> (...):33827/user/jobmanager_9 for job e403893e5208ca47ace886a77e405291
>> from the resource manager.
>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
>> o.a.f.runtime.leaderelection.ZooKeeperLeaderElectionService  - Stopping
>> ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService
>> {leaderPath='/leader/e403893e5208ca47ace886a77e405291/job_manager_lock'}.
>> 11:29:18.980 [flink-akka.actor.default-dispatcher-15695] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
>> checkpoint 31154 for job 5d8c376b10d358b9c9470b3e70113626 (132520 bytes
>> in 411 ms).
>> 11:29:19.025 [flink-akka.actor.default-dispatcher-15683] INFO
>> o.a.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed
>> job graph e403893e5208ca47ace886a77e405291 from ZooKeeper.
>>
>>
>> At the end it says removed job graph e403893e5208ca47ace886a77e405291
>> from ZooKeeper but I still can see it at /flink/default/jobgraphs:
>>
>> [zk: localhost:2181(CONNECTED) 14] ls /flink/default/jobgraphs/
>> e403893e5208ca47ace886a77e405291
>> [3fe9c3c8-5bec-404e-a720-75f9b188124f, 36208299-0f6d-462c-bae4-
>> 2e3d53f50e8c]
>>
>> Gerard
>>
>> On Wed, Jul 18, 2018 at 4:24 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Gerard,
>>>
>>> There is an issue recently fixed for 1.5.2, 1.6.0:
>>> https://issues.apache.org/jira/browse/FLINK-9575
>>> It might have caused your problem.
>>>
>>> Can you please provide log from JobManager/Entry point for further
>>> investigation?
>>>
>>> Cheers,
>>> Andrey
>>>
>>> On 18 Jul 2018, at 10:16, Gerard Garcia  wrote:
>>>
>>> Hi vino,
>>>
>>> Seems that jobs id stay in /jobgraphs when we cancel them manually. For
>>> example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09
>>> the entry is still in zookeeper's path /flink/default/jobgraphs, but the
>>> job disappeared from /home/nas/flink/

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

2018-07-19 Thread vino yang
Hi Gregory,

This exception seems a bug, you can create a issues in the JIRA.

Thanks, vino.

2018-07-20 10:28 GMT+08:00 Philip Doctor :

> Oh you were asking about the cast exception, I haven't seen that before,
> sorry to be off topic.
>
>
>
>
> --
> *From:* Philip Doctor 
> *Sent:* Thursday, July 19, 2018 9:27:15 PM
> *To:* Gregory Fee; user
> *Subject:* Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
> cannot be cast to org.apache.flink.streaming.runtime.streamrecord.
> StreamRecord
>
>
>
> I'm just a flink user, not an expert.  I've seen that exception before.  I
> have never seen it be the actual error, I usually see it when some other
> operator is throwing an uncaught exception and busy dying.  It seems to me
> that the prior operator throws this error "Can't forward to the next
> operator" why? because the next operator's already dead, but the job is
> busy dying asynchronously, so you get a cloud of errors that sort of
> surround the root cause.  I'd read your logs a little further back.
> --
> *From:* Gregory Fee 
> *Sent:* Thursday, July 19, 2018 9:10:37 PM
> *To:* user
> *Subject:* org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
> cannot be cast to org.apache.flink.streaming.runtime.streamrecord.
> StreamRecord
>
> Hello, I have a job running and I've gotten this error a few times. The
> job recovers from a checkpoint and seems to continue forward fine. Then the
> error will happen again sometime later, perhaps 1 hour. This looks like a
> Flink bug to me but I could use an expert opinion. Thanks!
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
>
> at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(
> BetterBuffer.java:316)
>
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
>
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
>
> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.
> processElement(Labeler.java:67)
>
> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.
> processElement(Labeler.java:48)
>
> at org.apache.flink.streaming.api.operators.ProcessOperator.
> processElement(ProcessOperator.java:66)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>
> ... 5 more
>
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
>
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
>
> at org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.scala:37)
>
> at org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.scala:28)
>
> at DataStreamSourceConversion$14.processElement(Unknown Source)
>
> at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(
> C

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

2018-07-19 Thread Philip Doctor
Oh you were asking about the cast exception, I haven't seen that before, sorry 
to be off topic.





From: Philip Doctor 
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker 
cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord



I'm just a flink user, not an expert.  I've seen that exception before.  I have 
never seen it be the actual error, I usually see it when some other operator is 
throwing an uncaught exception and busy dying.  It seems to me that the prior 
operator throws this error "Can't forward to the next operator" why? because 
the next operator's already dead, but the job is busy dying asynchronously, so 
you get a cloud of errors that sort of surround the root cause.  I'd read your 
logs a little further back.


From: Gregory Fee 
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot 
be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Hello, I have a job running and I've gotten this error a few times. The job 
recovers from a checkpoint and seems to continue forward fine. Then the error 
will happen again sometime later, perhaps 1 hour. This looks like a Flink bug 
to me but I could use an expert opinion. Thanks!


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: 
org.apache.flink.streaming.runtime.st

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

2018-07-19 Thread Philip Doctor

I'm just a flink user, not an expert.  I've seen that exception before.  I have 
never seen it be the actual error, I usually see it when some other operator is 
throwing an uncaught exception and busy dying.  It seems to me that the prior 
operator throws this error "Can't forward to the next operator" why? because 
the next operator's already dead, but the job is busy dying asynchronously, so 
you get a cloud of errors that sort of surround the root cause.  I'd read your 
logs a little further back.


From: Gregory Fee 
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot 
be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Hello, I have a job running and I've gotten this error a few times. The job 
recovers from a checkpoint and seems to continue forward fine. Then the error 
will happen again sometime later, perhaps 1 hour. This looks like a Flink bug 
to me but I could use an expert opinion. Thanks!


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.

org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

2018-07-19 Thread Gregory Fee
Hello, I have a job running and I've gotten this error a few times. The job
recovers from a checkpoint and seems to continue forward fine. Then the
error will happen again sometime later, perhaps 1 hour. This looks like a
Flink bug to me but I could use an expert opinion. Thanks!

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at
com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException:
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be
cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamCalcRule$37.processElement(Unknown Source)

at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)

at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(C

Re: Is KeyedProcessFunction available in Flink 1.4?

2018-07-19 Thread Bowen Li
Hi Anna,

KeyedProcessFunction is only available starting from Flink 1.5. The doc is
here
.
It extends ProcessFunction and shares the same functionalities except
giving more access to timers' key, thus you can refer to examples of
ProcessFunction in that document.

Bowen


On Thu, Jul 19, 2018 at 3:26 PM anna stax  wrote:

> Hello all,
> I am using Flink 1.4 because thats the version provided by the latest AWS
> EMR.
> Is KeyedProcessFunction available in Flink 1.4?
>
> Also please share any links to good examples on using KeyedProcessFunction
> .
>
> Thanks
>


Is KeyedProcessFunction available in Flink 1.4?

2018-07-19 Thread anna stax
Hello all,
I am using Flink 1.4 because thats the version provided by the latest AWS
EMR.
Is KeyedProcessFunction available in Flink 1.4?

Also please share any links to good examples on using KeyedProcessFunction
.

Thanks


Flink 1.4.2 -> 1.5.0 upgrade Queryable State debugging

2018-07-19 Thread Philip Doctor
Dear Flink Users,
I'm trying to upgrade to flink 1.5.0, so far everything works except for the 
Queryable state client.  Now here's where it gets weird.  I have the client 
sitting behind a web API so the rest of our non-java ecosystem can consume it.  
I've got 2 tests, one calls my route directly as a java method call, the other 
calls the deployed server via HTTP (the difference in the test intending to 
flex if the server is properly started, etc).

The local call succeeds, the remote call fails, but the failure isn't what I'd 
expect (around the server layer) it's compalining about contacting the oracle 
for state location:


 Caused by: 
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not 
contact the state location oracle to retrieve the state location.\n\tat 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)\n


The client call is pretty straightforward:

return client.getKvState(
flinkJobID,
stateDescriptor.queryableStateName,
key,
keyTypeInformation,
stateDescriptor
)

I've confirmed via logs that I have the exact same key, flink job ID, and 
queryable state name.

So I'm going bonkers on what difference might exist, I'm wondering if I'm 
packing my jar wrong and there's some resources I need to look out for? (back 
on flink 1.3.x I had to handle the reference.conf file for AKKA when you were 
depending on that for Queryable State, is there something like that? etc).  Is 
there /more logging/ somewhere on the server side that might give me a hint?  
like "Tried to query state for X but couldn't find the BananaLever" ?  I'm 
pretty stuck right now and ready to try any random ideas to move forward.


The only changes I've made aside from the jar version bump from 1.4.2 to 1.5.0 
is handling queryableStateName (which is now nullable), no other code changes 
were required, and to confirm, this all works just fine with 1.4.2.


Thank you.


Re: Keeping only latest row by key?

2018-07-19 Thread Fabian Hueske
HI James,

Yes, that should also do the trick.

Best, Fabian

2018-07-19 16:06 GMT+02:00 Porritt, James :

> It looks like the following gives me the result I’m interested in:
>
>
>
> batchEnv
>
> .createInput(dataset)
>
> .groupBy("id")
>
> .sortGroup("timestamp", Order.DESCENDING)
>
> .first(1);
>
>
>
> Is there anything I’ve misunderstood with this?
>
>
>
> *From:* Porritt, James 
> *Sent:* 19 July 2018 09:21
> *To:* 'Timo Walther' 
> *Cc:* user@flink.apache.org
> *Subject:* RE: Keeping only latest row by key?
>
>
>
> Hi Timo,
>
> Thanks for this. I’ve been looking into creating this in
> Java by looking at MaxAggFunction.scala as a basis. Is it correct that I’d
> be creating a version for each type I want to use it with (albeit using
> Generic s) and registering the functions separately for use with the
> correct type of table field?
>
>
>
> Thanks,
>
> James.
>
>
>
> *From:* Timo Walther 
> *Sent:* 18 July 2018 12:21
> *To:* Porritt, James 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Keeping only latest row by key?
>
>
>
> Hi James,
>
> the easiest solution for this bahavior is to use a user-defined LAST_VALUE
> aggregate function as discussed here [1].
>
> I hope this helps.
>
> Regards,
> Timo
>
> [1] http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-
> updated-td20519.html
>
>
> Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
>
> Hi James,
>
>
>
> There are over windows in Flink Table API:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/
> tableApi.html#over-windows
>
> It should be possible to implement this behaviour using them.
>
>
>
> Cheers,
>
> Andrey
>
>
>
> On 17 Jul 2018, at 18:27, Porritt, James  wrote:
>
>
>
> In Spark if I want to be able to get a set of unique rows by id, using the
> criteria of keeping the row with the latest timestamp, I would do the
> following:
>
>
>
> .withColumn("rn",
>
> F.row_number().over(
>
> Window.partitionBy(‘id’) \
>
> .orderBy(F.col('timestamp').desc())
>
> )
>
> ) \
>
> .where(F.col("rn") == 1)
>
>
>
> I see Flink has windowing functionality, but I don’t see it has row
> enumeration? How best in that case would I achieve the above?
>
>
>
> Thanks,
>
> James.
>
> ##
>
> The information contained in this communication is confidential and
>
> intended only for the individual(s) named above. If you are not a named
>
> addressee, please notify the sender immediately and delete this email
>
> from your system and do not disclose the email or any part of it to any
>
> person. The views expressed in this email are the views of the author
>
> and do not necessarily represent the views of Millennium Capital Partners
>
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
>
> communications of MCP LLP and its affiliates, including telephone
>
> communications, may be electronically archived and subject to review
>
> and/or disclosure to someone other than the recipient. MCP LLP is
>
> authorized and regulated by the Financial Conduct Authority. Millennium
>
> Capital Partners LLP is a limited liability partnership registered in
>
> England & Wales with number OC312897 and with its registered office at
>
> 50 Berkeley Street, London, W1J 8HD
> .
>
>
> ##
>
>
>
>
>
> ##
>
> The information contained in this communication is confidential and
>
> intended only for the individual(s) named above. If you are not a named
>
> addressee, please notify the sender immediately and delete this email
>
> from your system and do not disclose the email or any part of it to any
>
> person. The views expressed in this email are the views of the author
>
> and do not necessarily represent the views of Millennium Capital Partners
>
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
>
> communications of MCP LLP and its affiliates, including telephone
>
> communications, may be electronically archived and subject to review
>
> and/or disclosure to someone other than the recipient. MCP LLP is
>
> authorized and regulated by the Financial Conduct Authority. Millennium
>
> Capital Partners LLP is a limited liability partnership registered in
>
> England & Wales with number OC312897 and with its registered office at
>
> 50 Berkeley Street, London, W1J 8HD
> 
> .
>
> 

RE: Keeping only latest row by key?

2018-07-19 Thread Porritt, James
It looks like the following gives me the result I’m interested in:

batchEnv
.createInput(dataset)
.groupBy("id")
.sortGroup("timestamp", Order.DESCENDING)
.first(1);

Is there anything I’ve misunderstood with this?

From: Porritt, James 
Sent: 19 July 2018 09:21
To: 'Timo Walther' 
Cc: user@flink.apache.org
Subject: RE: Keeping only latest row by key?

Hi Timo,
Thanks for this. I’ve been looking into creating this in Java 
by looking at MaxAggFunction.scala as a basis. Is it correct that I’d be 
creating a version for each type I want to use it with (albeit using Generic s) 
and registering the functions separately for use with the correct type of table 
field?

Thanks,
James.

From: Timo Walther mailto:twal...@apache.org>>
Sent: 18 July 2018 12:21
To: Porritt, James mailto:james.porr...@uk.mlp.com>>
Cc: user@flink.apache.org
Subject: Re: Keeping only latest row by key?

Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE 
aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
It should be possible to implement this behaviour using them.

Cheers,
Andrey

On 17 Jul 2018, at 18:27, Porritt, James 
mailto:james.porr...@uk.mlp.com>> wrote:

In Spark if I want to be able to get a set of unique rows by id, using the 
criteria of keeping the row with the latest timestamp, I would do the following:

.withColumn("rn",
F.row_number().over(
Window.partitionBy(‘id’) \
.orderBy(F.col('timestamp').desc())
)
) \
.where(F.col("rn") == 1)

I see Flink has windowing functionality, but I don’t see it has row 
enumeration? How best in that case would I achieve the above?

Thanks,
James.
##
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
##



##
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
##

##

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or

Re: Flink on Mesos: containers question

2018-07-19 Thread Till Rohrmann
Hi Alexei,

I actually never used Mesos with container images. I always used it in a
way where the Mesos task directly starts the Java process.

Cheers,
Till

On Thu, Jul 19, 2018 at 2:44 PM NEKRASSOV, ALEXEI  wrote:

> Till,
>
>
>
> Any insight into how Flink components are containerized in Mesos?
>
>
>
> Thanks!
>
> Alex
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Monday, July 16, 2018 7:57 AM
> *To:* NEKRASSOV, ALEXEI 
> *Cc:* user@flink.apache.org; Till Rohrmann 
> *Subject:* Re: Flink on Mesos: containers question
>
>
>
> Hi Alexei,
>
>
>
> Till (in CC) is familiar with Flink's Mesos support in 1.4.x.
>
>
>
> Best, Fabian
>
>
>
> 2018-07-13 15:07 GMT+02:00 NEKRASSOV, ALEXEI :
>
> Can someone please clarify how Flink on Mesos in containerized?
>
>
>
> On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers.
> Mesos shows “flink” task and two “taskmanager” tasks, all on the same VM.
>
> On that VM I see one Docker container running a process that seems to be
> Mesos App Master:
>
>
>
> $ docker ps -a
>
> CONTAINER IDIMAGE
> COMMAND  CREATED STATUS
> PORTS   NAMES
>
> 97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c
> /sbin/..."   41 hours agoUp 41 hours
> mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
>
> $ docker exec 97b6840466c0 /bin/ps -efww
>
> UIDPID  PPID  C STIME TTY  TIME CMD
>
> root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
>
> root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
>
> root 8 7  0 Jul11 ?00:00:00 runsv flink
>
> root   629 0  0 Jul12 pts/000:00:00 /bin/bash
>
> root   789 8  1 Jul12 ?00:09:16
> /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath
> /flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
> -Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
> -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties
> -Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner
> -Dblob.server.port=23170 -Djobmanager.heap.mb=256
> -Djobmanager.rpc.port=23169 -Djobmanager.web.port=23168
> -Dmesos.artifact-server.port=23171 -Dmesos.initial-tasks=2
> -Dmesos.resourcemanager.tasks.cpus=2 -Dmesos.resourcemanager.tasks.mem=2048
> -Dtaskmanager.heap.mb=512 -Dtaskmanager.memory.preallocate=true
> -Dtaskmanager.numberOfTaskSlots=1 -Dparallelism.default=1
> -Djobmanager.rpc.address=localhost -Dmesos.resourcemanager.framework.role=*
> -Dsecurity.kerberos.login.use-ticket-cache=true
>
> root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww
>
>
>
> Then on the VM itself I see another process with the same command line as
> the one in the container:
>
>
>
> root 13276  9689  1 Jul12 ?00:09:18
> /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath /flink
> -1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink
> -shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink
> -1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
> -Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
> -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties
> -Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml org.apache.
> flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner
> -Dblob.server.port=23170 -Djobmanager.heap.mb=256
> -Djobmanager.rpc.port=23169 -Djobmanager.web.port=23168
> -Dmesos.artifact-server.port=23171 -Dmesos.initial-tasks=2
> -Dmesos.resourcemanager.tasks.cpus=2 -Dmesos.resourcemanager.tasks.mem=2048
> -Dtaskmanager.heap.mb=512 -Dtaskmanager.memory.preallocate=true
> -Dtaskmanager.numberOfTaskSlots=1 -Dparallelism.default=1
> -Djobmanager.rpc.address=localhost -Dmesos.resourcemanager.framework.role=*
> -Dsecurity.kerberos.login.use-ticket-cache=true
>
>
>
> And I see two processes on the VM that seem to be related to Task Managers:
>
>
>
> root 13688 13687  0 Jul12 ?00:04:25
> /docker-java-home/jre/bin/java -Xms1448m -Xmx1448m -classpath
> /mnt/mesos/sandbox/flink/lib/flink
> -python_2.11-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/flink
> -shaded-hadoop2-uber-1.4.2.jar:/mnt/mesos/sandbox/flink
> /lib/log4j-1.2.17.jar:/mnt/mesos/sandbox/flink
> /lib/slf4j-log4j12-1.7.7.jar:/mnt/mesos/sandbox/flink/lib/flink-dist_2.11-1.4.2.jar:::
> -Dlog.file=flink-taskmanager.log
> -Dlog4j.configuration=file:/mnt/mesos/sandbox/flink/conf/log4j.properties
> -Dlogback.configurationFile=file:/mnt/mesos/sandbox/flink/conf/logback.xml
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager
> -Dblob.server.port=23170 -D

Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-19 Thread Till Rohrmann
Hi Gerard,

the logging statement `Removed job graph ... from ZooKeeper` is actually
not 100% accurate. The actual deletion is executed as an asynchronous
background task and the log statement is not printed in the callback (which
it should). Therefore, the deletion could still have failed. In order to
see this, the full jobmanager/cluster entry point logs would be
tremendously helpful.

Cheers,
Till

On Thu, Jul 19, 2018 at 1:33 PM Gerard Garcia  wrote:

> Thanks Andrey,
>
> That is the log from the jobmanager just after it has finished cancelling
> the task:
>
> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Stopping
> checkpoint coordinator for job e403893e5208ca47ace886a77e405291.
> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down
> 11:29:18.738 [flink-akka.actor.default-dispatcher-15695] INFO
> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing
> /flink-eur/default/checkpoints/e403893e5208ca47ace886a77e405291 from
> ZooKeeper
> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.
> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing
> /checkpoint-counter/e403893e5208ca47ace886a77e405291 from ZooKeeper
> 11:29:18.827 [flink-akka.actor.default-dispatcher-15695] INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
> e403893e5208ca47ace886a77e405291 reached globally terminal state CANCELED.
> 11:29:18.846 [flink-akka.actor.default-dispatcher-15675] INFO
> org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the JobMaster for
> job (...)(e403893e5208ca47ace886a77e405291).
> 11:29:18.848 [flink-akka.actor.default-dispatcher-15675] INFO
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
> org.apache.flink.runtime.jobmaster.JobMaster  - Close ResourceManager
> connection d5fbc30a895066054e29fb2fd60fb0f1: JobManager is shutting down..
> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending SlotPool.
> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping SlotPool.
> 11:29:18.864 [flink-akka.actor.default-dispatcher-15688] INFO
> o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Disconnect
> job manager 
> 9cf221e2340597629fb932c03aa14...@akka.tcp://flink@(...):33827/user/jobmanager_9
> for job e403893e5208ca47ace886a77e405291 from the resource manager.
> 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
> o.a.f.runtime.leaderelection.ZooKeeperLeaderElectionService  - Stopping
> ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/e403893e5208ca47ace886a77e405291/job_manager_lock'}.
> 11:29:18.980 [flink-akka.actor.default-dispatcher-15695] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 31154 for job 5d8c376b10d358b9c9470b3e70113626 (132520 bytes in
> 411 ms).
> 11:29:19.025 [flink-akka.actor.default-dispatcher-15683] INFO
> o.a.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job
> graph e403893e5208ca47ace886a77e405291 from ZooKeeper.
>
>
> At the end it says removed job graph e403893e5208ca47ace886a77e405291 from
> ZooKeeper but I still can see it at /flink/default/jobgraphs:
>
> [zk: localhost:2181(CONNECTED) 14] ls
> /flink/default/jobgraphs/e403893e5208ca47ace886a77e405291
> [3fe9c3c8-5bec-404e-a720-75f9b188124f,
> 36208299-0f6d-462c-bae4-2e3d53f50e8c]
>
> Gerard
>
> On Wed, Jul 18, 2018 at 4:24 PM Andrey Zagrebin 
> wrote:
>
>> Hi Gerard,
>>
>> There is an issue recently fixed for 1.5.2, 1.6.0:
>> https://issues.apache.org/jira/browse/FLINK-9575
>> It might have caused your problem.
>>
>> Can you please provide log from JobManager/Entry point for further
>> investigation?
>>
>> Cheers,
>> Andrey
>>
>> On 18 Jul 2018, at 10:16, Gerard Garcia  wrote:
>>
>> Hi vino,
>>
>> Seems that jobs id stay in /jobgraphs when we cancel them manually. For
>> example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09
>> the entry is still in zookeeper's path /flink/default/jobgraphs, but the
>> job disappeared from /home/nas/flink/ha/default/blob/.
>>
>> That is the client log:
>>
>> 09:20:58.492 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
>> Cancelling job 75e16686cb4fe0d33ead8e29af131d09.
>> 09:20:58.503 [main] INFO
>> org.apache.flink.runtime.blob.FileSystemBlobStore  - Creating highly
>> available BLOB storage directory at
>> file:///home/nas/flink/ha//default/blob
>> 09:20:58.505 [main] INFO  org.apache.f

Re: Why data didn't enter the time window in EventTime mode

2018-07-19 Thread Hequn Cheng
Hi Soheil,
You can monitor the watermarks in the web dashboard as Fabian said. There
are some documents here[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Thu, Jul 19, 2018 at 3:53 PM, Fabian Hueske  wrote:

> Hi Soheil,
>
> Hequn is right. This might be an issue with advancing event-time.
> You can monitor that by checking the watermarks in the web dashboard or
> print-debug it with a ProcessFunction which can lookup the current
> watermark.
>
> Best, Fabian
>
> 2018-07-19 3:30 GMT+02:00 Hequn Cheng :
>
>> Hi Soheil,
>>
>> > wait 8 milliseconds (according to my code) to see if any other data
>> with the same key received or not and after 8 millisecond it will be
>> triggered.
>> Yes, but the time is event time, so if there is no data from source the
>> time won't be advanced.
>>
>> There are some reasons why the event time has not been advanced:
>> 1. There are no data from the source
>> 2. One of the source parallelisms doesn't have data
>> 3. The time field, i.e, Long in Tuple3, should be millisecond instead of
>> second.
>> 4. Data should cover a longer time spam than the window size to advance
>> the event time.
>>
>> Best, Hequn
>>
>> On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani <
>> soheil.i...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In a datastream processing problem, the source generated data every 8
>>> millisecond and timestamp is a field of the data. In default Flink time
>>> behavior data enter the time window but when I set Flink time to EventTime
>>> it will output nothing! Here is the code:
>>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> SingleOutputStreamOperator> res = 
>>> aggregatedTuple
>>> .assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor>> JSONObject>>(Time.milliseconds(8)) {
>>>
>>> @Override
>>> public long extractTimestamp(Tuple3 
>>> element) {
>>> return element.f1 ;
>>> }
>>> }).keyBy(1).timeWindow(Time.milliseconds(8))
>>> .allowedLateness(Time.milliseconds(3))
>>> .sideOutputLateData(lateOutputTag)
>>> .reduce(processing...);
>>> DataStream> lateData = 
>>> res.getSideOutput(lateOutputTag);
>>> res.print();
>>>
>>> What is the problem with my code?
>>> According to the Flink documents, my understanding about EventTime is
>>> that for example in case of time window when a new data received it start a
>>> new (logical window) based on new data event timestamp and wait 8
>>> milliseconds (according to my code) to see if any other data with the same
>>> key received or not and after 8 millisecond (from timestamp of the first
>>> element of the window) it will be triggered. Since data source generated
>>> data in a constant periodic interval, I set a watermarck of  8, too. Is my
>>> understanding about Flink window in EventTime correct?
>>>
>>
>>
>


RE: Flink on Mesos: containers question

2018-07-19 Thread NEKRASSOV, ALEXEI
Till,

Any insight into how Flink components are containerized in Mesos?

Thanks!
Alex

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Monday, July 16, 2018 7:57 AM
To: NEKRASSOV, ALEXEI 
Cc: user@flink.apache.org; Till Rohrmann 
Subject: Re: Flink on Mesos: containers question

Hi Alexei,

Till (in CC) is familiar with Flink's Mesos support in 1.4.x.

Best, Fabian

2018-07-13 15:07 GMT+02:00 NEKRASSOV, ALEXEI 
mailto:an4...@att.com>>:
Can someone please clarify how Flink on Mesos in containerized?

On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers. Mesos 
shows “flink” task and two “taskmanager” tasks, all on the same VM.
On that VM I see one Docker container running a process that seems to be Mesos 
App Master:

$ docker ps -a
CONTAINER IDIMAGE COMMAND  
CREATED STATUS  PORTS   NAMES
97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c /sbin/..."   
41 hours agoUp 41 hours 
mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
$ docker exec 97b6840466c0 /bin/ps -efww
UIDPID  PPID  C STIME TTY  TIME CMD
root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
root 8 7  0 Jul11 ?00:00:00 runsv flink
root   629 0  0 Jul12 pts/000:00:00 /bin/bash
root   789 8  1 Jul12 ?00:09:16 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true
root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww

Then on the VM itself I see another process with the same command line as the 
one in the container:

root 13276  9689  1 Jul12 ?00:09:18 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true

And I see two processes on the VM that seem to be related to Task Managers:

root 13688 13687  0 Jul12 ?00:04:25 /docker-java-home/jre/bin/java 
-Xms1448m -Xmx1448m -classpath 
/mnt/mesos/sandbox/flink/lib/flink-python_2.11-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/log4j-1.2.17.jar:/mnt/mesos/sandbox/flink/lib/slf4j-log4j12-1.7.7.jar:/mnt/mesos/sandbox/flink/lib/flink-dist_2.11-1.4.2.jar:::
 -Dlog.file=flink-taskmanager.log 
-Dlog4j.configuration=file:/mnt/mesos/sandbox/flink/conf/log4j.properties 
-Dlogback.configurationFile=file:/mnt/mesos/sandbox/flink/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager 
-Dblob.server.port=23170 -Dmesos.artifact-server.port=23171 
-Djobmanager.heap.mb=256 -Djobmanager.rpc.address=localhost 
-Djobmanager.web.port=23168 -Dsecurity.kerberos.login.use-ticket-cache=true 
-Djobmanager.rpc.port=23169 -Dtaskmanager.memory.preallocate=true 
-Dtaskmanager.rpc.port=1027 -Dmesos.initial-tasks=2 
-Dmesos.resourcemanager.tasks.cpus=2 -Dtaskmanager.maxReg

Re: data enrichment via endpoint, serializable issue

2018-07-19 Thread Steffen Wohlers
Hi Xingcan,

option two RichMapFunction works , thanks a lot!


Thanks,
Steffen

> On 19. Jul 2018, at 13:59, Xingcan Cui  wrote:
> 
> Hi Steffen,
> 
> You could make the class `TextAPIClient` serializable, or use 
> `RichMapFunction` [1] and instantiate all the required objects in its 
> `open()` method.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#rich-functions
>  
> 
> 
> Best,
> Xingcan
> 
>> On Jul 19, 2018, at 6:56 PM, Steffen Wohlers > > wrote:
>> 
>> Hi all,
>> 
>> I’m new to Apache Flink and I have the following issue:
>> 
>> I would like to enrich data via map function. For that I call a method which 
>> calls an endpoint but I get following error message 
>> 
>> „The implementation of the MapFunction is not serializable. The object 
>> probably contains or references non serializable fields.
>>  at 
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
>> „Caused by: java.io.NotSerializableException: 
>> com.aylien.textapi.TextAPIClient“
>> 
>> Is there a smart way to fix that issue?
>> 
>> Regards,
>> 
>> Steffen
>> 
>> 
>> Map Function:
>> DataStream tweetSentimentDataStream = noRTDataStream
>> .map(new MapFunction() {
>> @Override
>> public TweetSentiment map(Tweet tweet) throws Exception {
>> String polarity = "good";
>> polarity = test.testMethod();
>> polarity =  sentimentAnalysis.sentiment(tweet.getText());
>> return new TweetSentiment(tweet, polarity, 0);
>> }
>> });
>> 
>> Class:
>> 
>> public class SentimentAnalysis implements Serializable {
>> 
>> private TextAPIClient _sentimentClient;
>> 
>> public SentimentAnalysis () {
>> _sentimentClient = new TextAPIClient(„xxx", „xxx");
>> }
>> 
>> public String sentiment(String text)  throws Exception{
>> SentimentParams sentimentParams = new SentimentParams(text, null, 
>> null);
>> Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);
>> 
>> return sentiment.getPolarity();
>> }
>> (Analysis via Aylien)
> 



Re: Bootstrapping the state

2018-07-19 Thread vino yang
Hi Henkka,

The behavior of the text file source meets expectation. Flink will not keep
your source task thread when it exit from it's invoke method. That means
you should keep your source task alive. So to implement this, you should
customize a text file source (implement SourceFunction interface).

 For your requirement, you can check a no more data idle time, if expire,
then exit, finally the job will stop.

You can also refer the implementation of other source connectors.

Thanks, vino.

2018-07-19 19:52 GMT+08:00 Henri Heiskanen :

> Hi,
>
> I've been looking into how to initialise large state and especially
> checked this presentation by Lyft referenced in this group as well:
> https://www.youtube.com/watch?v=WdMcyN5QZZQ
>
> In our use case we would like to load roughly 4 billion entries into this
> state and I believe loading this data from s3, creating a savepoint and
> then restarting in streaming mode from a savepoint would work very well. In
> the presentation I get an impression that I could read from s3 and when all
> done (without any custom termination detector etc) I could just make a
> savepoint by calling the rest api from the app. However, I've noticed that
> if I read data from files the job will auto-terminate when all data is read
> and job appears not to be running even if I add the sleep in the main
> program (very simple app attached below).
>
> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
> from terminating and create the savepoint from outside the app, but that
> would require termination detection etc and would make the solution less
> clean.
>
> Has anyone more details how I could accomplish this?
>
> Br,
> Henkka
>
> public class StreamingJob {
>
>public static void main(String[] args) throws Exception {
>   if (args.length == 0) {
>  args = "--initFile init.csv".split(" ");
>   }
>
>   // set up the streaming execution environment
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>   ParameterTool params = ParameterTool.fromArgs(args);
>
>   String initFile = params.get("initFile");
>   if (initFile != null) {
>  env.readTextFile(initFile).map(new MapFunction Tuple4>() {
> @Override
> public Tuple4 map(String s) 
> throws Exception {
>String[] data = s.split(",");
>return new Tuple4(data[0], 
> data[1], data[2], data[3]);
> }
>  }).keyBy(0, 1).map(new ProfileInitMapper());
>   }
>
>   // execute program
>   env.execute("Flink Streaming Java API Skeleton");
>
>   // when all data read, save the state
>   Thread.sleep(1);
>}
> }
>
>
>
>


Re: data enrichment via endpoint, serializable issue

2018-07-19 Thread Xingcan Cui
Hi Steffen,

You could make the class `TextAPIClient` serializable, or use `RichMapFunction` 
[1] and instantiate all the required objects in its `open()` method.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#rich-functions
 


Best,
Xingcan

> On Jul 19, 2018, at 6:56 PM, Steffen Wohlers  wrote:
> 
> Hi all,
> 
> I’m new to Apache Flink and I have the following issue:
> 
> I would like to enrich data via map function. For that I call a method which 
> calls an endpoint but I get following error message 
> 
> „The implementation of the MapFunction is not serializable. The object 
> probably contains or references non serializable fields.
>   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
> „Caused by: java.io.NotSerializableException: 
> com.aylien.textapi.TextAPIClient“
> 
> Is there a smart way to fix that issue?
> 
> Regards,
> 
> Steffen
> 
> 
> Map Function:
> DataStream tweetSentimentDataStream = noRTDataStream
> .map(new MapFunction() {
> @Override
> public TweetSentiment map(Tweet tweet) throws Exception {
> String polarity = "good";
> polarity = test.testMethod();
> polarity =  sentimentAnalysis.sentiment(tweet.getText());
> return new TweetSentiment(tweet, polarity, 0);
> }
> });
> 
> Class:
> 
> public class SentimentAnalysis implements Serializable {
> 
> private TextAPIClient _sentimentClient;
> 
> public SentimentAnalysis () {
> _sentimentClient = new TextAPIClient(„xxx", „xxx");
> }
> 
> public String sentiment(String text)  throws Exception{
> SentimentParams sentimentParams = new SentimentParams(text, null, 
> null);
> Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);
> 
> return sentiment.getPolarity();
> }
> (Analysis via Aylien)



Bootstrapping the state

2018-07-19 Thread Henri Heiskanen
Hi,

I've been looking into how to initialise large state and especially checked
this presentation by Lyft referenced in this group as well:
https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this
state and I believe loading this data from s3, creating a savepoint and
then restarting in streaming mode from a savepoint would work very well. In
the presentation I get an impression that I could read from s3 and when all
done (without any custom termination detector etc) I could just make a
savepoint by calling the rest api from the app. However, I've noticed that
if I read data from files the job will auto-terminate when all data is read
and job appears not to be running even if I add the sleep in the main
program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from
terminating and create the savepoint from outside the app, but that would
require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka

public class StreamingJob {

   public static void main(String[] args) throws Exception {
  if (args.length == 0) {
 args = "--initFile init.csv".split(" ");
  }

  // set up the streaming execution environment
  final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

  ParameterTool params = ParameterTool.fromArgs(args);

  String initFile = params.get("initFile");
  if (initFile != null) {
 env.readTextFile(initFile).map(new MapFunction>() {
@Override
public Tuple4 map(String
s) throws Exception {
   String[] data = s.split(",");
   return new Tuple4(data[0], data[1], data[2], data[3]);
}
 }).keyBy(0, 1).map(new ProfileInitMapper());
  }

  // execute program
  env.execute("Flink Streaming Java API Skeleton");

  // when all data read, save the state
  Thread.sleep(1);
   }
}


Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-19 Thread Gerard Garcia
Thanks Andrey,

That is the log from the jobmanager just after it has finished cancelling
the task:

11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Stopping
checkpoint coordinator for job e403893e5208ca47ace886a77e405291.
11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down
11:29:18.738 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing
/flink-eur/default/checkpoints/e403893e5208ca47ace886a77e405291 from
ZooKeeper
11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.
11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO
o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing
/checkpoint-counter/e403893e5208ca47ace886a77e405291 from ZooKeeper
11:29:18.827 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
e403893e5208ca47ace886a77e405291 reached globally terminal state CANCELED.
11:29:18.846 [flink-akka.actor.default-dispatcher-15675] INFO
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the JobMaster for
job (...)(e403893e5208ca47ace886a77e405291).
11:29:18.848 [flink-akka.actor.default-dispatcher-15675] INFO
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping
ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
org.apache.flink.runtime.jobmaster.JobMaster  - Close ResourceManager
connection d5fbc30a895066054e29fb2fd60fb0f1: JobManager is shutting down..
11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending SlotPool.
11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping SlotPool.
11:29:18.864 [flink-akka.actor.default-dispatcher-15688] INFO
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Disconnect
job manager 
9cf221e2340597629fb932c03aa14...@akka.tcp://flink@(...):33827/user/jobmanager_9
for job e403893e5208ca47ace886a77e405291 from the resource manager.
11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO
o.a.f.runtime.leaderelection.ZooKeeperLeaderElectionService  - Stopping
ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/e403893e5208ca47ace886a77e405291/job_manager_lock'}.
11:29:18.980 [flink-akka.actor.default-dispatcher-15695] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 31154 for job 5d8c376b10d358b9c9470b3e70113626 (132520 bytes in
411 ms).
11:29:19.025 [flink-akka.actor.default-dispatcher-15683] INFO
o.a.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job
graph e403893e5208ca47ace886a77e405291 from ZooKeeper.


At the end it says removed job graph e403893e5208ca47ace886a77e405291 from
ZooKeeper but I still can see it at /flink/default/jobgraphs:

[zk: localhost:2181(CONNECTED) 14] ls
/flink/default/jobgraphs/e403893e5208ca47ace886a77e405291
[3fe9c3c8-5bec-404e-a720-75f9b188124f, 36208299-0f6d-462c-bae4-2e3d53f50e8c]

Gerard

On Wed, Jul 18, 2018 at 4:24 PM Andrey Zagrebin 
wrote:

> Hi Gerard,
>
> There is an issue recently fixed for 1.5.2, 1.6.0:
> https://issues.apache.org/jira/browse/FLINK-9575
> It might have caused your problem.
>
> Can you please provide log from JobManager/Entry point for further
> investigation?
>
> Cheers,
> Andrey
>
> On 18 Jul 2018, at 10:16, Gerard Garcia  wrote:
>
> Hi vino,
>
> Seems that jobs id stay in /jobgraphs when we cancel them manually. For
> example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09
> the entry is still in zookeeper's path /flink/default/jobgraphs, but the
> job disappeared from /home/nas/flink/ha/default/blob/.
>
> That is the client log:
>
> 09:20:58.492 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
> Cancelling job 75e16686cb4fe0d33ead8e29af131d09.
> 09:20:58.503 [main] INFO
> org.apache.flink.runtime.blob.FileSystemBlobStore  - Creating highly
> available BLOB storage directory at
> file:///home/nas/flink/ha//default/blob
> 09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
> Enforcing default ACL for ZK connections
> 09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
> Using '/flink-eur/default' as Zookeeper namespace.
> 09:20:58.539 [main] INFO
> o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
> 09:20:58.543 [main] INFO
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
> environment:zookeeper.version=
> 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13
> GMT
> 09:20:58.543 [main] INFO
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Clie

Re: Object reuse in DataStreams

2018-07-19 Thread vino yang
Hi Urs,

I think Flink does not encourage to use "object reuse" feature, because in
the documentation, it warn the user it may course bug when the user-code
function of an operation is not aware of this behavior[1].

The "object reuse" is runtime behavior and it's configuration item belongs
`ExecutionConfig` (this class both for batch and streaming), so it takes
efforts for both batch and streaming[1].

"Object reuse" feature is not perfect, the main use case is for batch[2]
and FLIP-21 tries to promote this feature gracefully, but the current state
is "*Under discussion*".

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/execution_configuration.html#execution-configuration
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#operating-on-data-objects-in-functions

Thanks, vino.

2018-07-17 20:50 GMT+08:00 Urs Schoenenberger <
urs.schoenenber...@tngtech.com>:

> Hi all,
>
> we came across some interesting behaviour today.
> We enabled object reuse on a streaming job that looks like this:
>
> stream = env.addSource(source)
> stream.map(mapFnA).addSink(sinkA)
> stream.map(mapFnB).addSink(sinkB)
>
> Operator chaining is enabled, so the optimizer fuses all operations into
> a single slot.
> The same object reference gets passed to both mapFnA and mapFnB. This
> makes sense when I think about the internal implementation, but it still
> came as a bit of a surprise since the object reuse docs (for batch -
> there are no official ones for streaming, right?) don't really deal with
> splitting the DataSet/DataStream. I guess my case is *technically*
> covered by the documented warning that it is unsafe to reuse an object
> that has already been collected, only in this case this reuse is
> "hidden" behind the stream definition DSL.
>
> Is this the expected behaviour? Is object reuse for DataStreams
> encouraged at all or is it more of a "hidden beta" feature until FLIP-21
> is officially finished?
>
> Best,
> Urs
>
> --
> Urs Schönenberger - urs.schoenenber...@tngtech.com
>
> TNG Technology Consulting GmbH, Beta-Straße 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


data enrichment via endpoint, serializable issue

2018-07-19 Thread Steffen Wohlers
Hi all,

I’m new to Apache Flink and I have the following issue:

I would like to enrich data via map function. For that I call a method which 
calls an endpoint but I get following error message 

„The implementation of the MapFunction is not serializable. The object probably 
contains or references non serializable fields.
at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
„Caused by: java.io.NotSerializableException: com.aylien.textapi.TextAPIClient“

Is there a smart way to fix that issue?

Regards,

Steffen


Map Function:
DataStream tweetSentimentDataStream = noRTDataStream
.map(new MapFunction() {
@Override
public TweetSentiment map(Tweet tweet) throws Exception {
String polarity = "good";
polarity = test.testMethod();
polarity =  sentimentAnalysis.sentiment(tweet.getText());
return new TweetSentiment(tweet, polarity, 0);
}
});

Class:

public class SentimentAnalysis implements Serializable {

private TextAPIClient _sentimentClient;

public SentimentAnalysis () {
_sentimentClient = new TextAPIClient(„xxx", „xxx");
}

public String sentiment(String text)  throws Exception{
SentimentParams sentimentParams = new SentimentParams(text, null, null);
Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);

return sentiment.getPolarity();
}
(Analysis via Aylien)

Re: Global latency metrics

2018-07-19 Thread vino yang
Hi shimin,

For some scenario, your requirement is necessary. And sometimes, we want to
know the total throughput, latency and the event processing rate
end-to-end. But currently, Flink can not support the global metrics.

To Chesnay,

I think it's a good feature the community can consider.

Thanks, vino.


2018-07-17 17:58 GMT+08:00 Chesnay Schepler :

> No, you can only get the latency for each operator.
>
> For starters, how would a global latency even account for multiple
> sources/sink?
>
>
> On 17.07.2018 10:22, shimin yang wrote:
>
>> Hi All,
>>
>> Is there a method to get the global latency directly? Since I only find
>> the latency for each operator in the Flink Rest API.
>>
>> Best,
>>
>> Shimin
>>
>
>
>


Re: Parallel stream partitions

2018-07-19 Thread Fabian Hueske
Hi Nick,

What Ken said is correct, but let me add two more things.

1) State
Usually, you only need to partition (keyBy()) the data if you want to
process tuples with the same same key together.
Therefore, it is necessary to hold some tuples or intermediate results
(like partial or running aggregates) in state. Flink is a stateful stream
processor and offers many features around state management.
One of them is keyed state, i.e., state that is maintained per key. When a
function processes a tuple, keyed state is automatically put into the
context of the current key. Because the state is always associated with a
key, it is not a problem that a function instance processes multiple keys.

2) Ordering
In a parallel system it is very expensive to reason about or guarantee
ordering. Flink only ensures that tuples that flow through a partition are
processed in order. However, order across different partitions cannot be
guaranteed. Hence, shuffles (due to keyBy or changed parallelism) can
change the order.

Best,
Fabian

2018-07-18 1:50 GMT+02:00 Ken Krugler :

> Hi Nick,
>
> On Jul 17, 2018, at 9:09 AM, Nicholas Walton  wrote:
>
> Suppose I have a data stream of tuples  Double> with the sequence of ticks being 1,2,3,…. for each separate k.
>
> I understand and keyBy(2)
>
>
> I think you want keyBy(1), since it’s 0-based.
>
> will partition the stream so each partition has the same key in each
> tuple.
>
>
> I don’t think that’s exactly correct.
>
> Each tuple with the same key value will be in the same partition. But each
> partition can receive multiple key values, depending on the cardinality of
> the keys, the number of partitions, and how they get hashed.
>
> I now have a sequence of functions to apply to the streams say f(),g() and
> h() in that order.
>
>
> Assuming these functions are all post-partitioning, then I would expect
> all tuples with the same key would be processed by the functions that are
> also running in the same partition.
>
> So .keyBy(1).map(f).map(g).map(h) should partition by the key, and then
> chain the processing of tuples.
>
> — Ken
>
>
> With parallelism set to 1 then each partition-stream passes through f then
> g then h (f | g | h) in order of tick.
>
> I want to run each partition-stream in parallel, setting parallelism in
> the Web GUI.
>
> My question is how do I ensure  each partition stream passes through a
> fixed sequence (f | g | h)  rather than if parallelism is p running p
> instances each of f g & h with no guarantee that each partition-stream
> flows through a unique set of three instances  in tick-order, especially if
> p is greater than the largest value of key.
>
> A typical use case would be to maintain a moving average over each key
>
> <1*Xjd2gfMhYqx0sIvAISR47A.png>
>
> I need to remove the crossover in the middle box, so [1] -> [1] -> [1] and
> [2] -> [2] -> [2], instead of  [1] -> [1] -> [1 or 2] .
>
> Nick
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: Description of Flink event time processing

2018-07-19 Thread Fabian Hueske
Hi Elias,

Thanks for the update!
I'll try to have another look soon.

Best, Fabian

2018-07-11 1:30 GMT+02:00 Elias Levy :

> Thanks for all the comments.  I've updated the document to account for the
> feedback.  Please take a look.
>
> On Fri, Jul 6, 2018 at 2:33 PM Elias Levy 
> wrote:
>
>> Apologies.  Comments are now enabled.
>>
>> On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:
>>
>>> Hi Elias,
>>>
>>> Thanks for putting together the document. This is actually a very good,
>>> well-rounded document.
>>> I think you did not to enable access for comments for the link. Would
>>> you mind enabling comments for the google doc?
>>>
>>> Thanks,
>>> Rong
>>>
>>>
>>> On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske  wrote:
>>>
 Hi Elias,

 Thanks for the great document!
 I made a pass over it and left a few comments.

 I think we should definitely add this to the documentation.

 Thanks,
 Fabian

 2018-07-04 10:30 GMT+02:00 Fabian Hueske :

> Hi Elias,
>
> I agree, the docs lack a coherent discussion of event time features.
> Thank you for this write up!
> I just skimmed your document and will provide more detailed feedback
> later.
>
> It would be great to add such a page to the documentation.
>
> Best, Fabian
>
> 2018-07-03 3:07 GMT+02:00 Elias Levy :
>
>> The documentation of how Flink handles event time and watermarks is
>> spread across several places.  I've been wanting a single location that
>> summarizes the subject, and as none was available, I wrote one up.
>>
>> You can find it here: https://docs.google.com/document/d/1b5d-
>> hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing
>>
>> I'd appreciate feedback, particularly about the correctness of the
>> described behavior.
>>
>
>



Re: flink 1.4.2 Ambari

2018-07-19 Thread Jeff Bean
Antonio,

Have you seen:

https://github.com/abajwa-hw/ambari-flink-service

Jeff

On Fri, Jul 13, 2018 at 7:45 PM, antonio saldivar 
wrote:

> Hello
>
> I am trying to find the way to add Flink 1.4.2 service to ambari because
> is not listed in the Stack. does anyone has the  steps to add this service
> manually?
>
> Thank you
> Best regards
>



-- 
Jeff Bean
Technical Evangelist
1-831-435-9847



Re: Cannot configure akka.ask.timeout

2018-07-19 Thread Gary Yao
Hi Lukas,

It seems that when using MiniCluster, the config key akka.ask.timeout is not
respected. Instead, a hardcoded timeout of 10s is used [1]. Since all
communication is locally, it would be interesting to see in detail what your
job looks like that it exceeds the timeout.

The key akka.ask.timeout specifies the default RPC timeout. However, for
requests originating from the REST API, web.timeout overrides this value.
When
submitting a job using the CLI to a (standalone) cluster, a request is
issued
against the REST API. Therefore, you can try setting web.timeout=10 in
the
flink-conf.yaml as already proposed by Vishal Santoshi.

Best,
Gary

[1]
https://github.com/apache/flink/blob/749dd29935f319b062051141e150eed7a1a5f298/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L185

On Fri, Jul 13, 2018 at 12:24 PM, Lukas Kircher 
wrote:

> Hello,
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0.
> When I run a job I get the exception listed below which states that Akka
> timed out after 1ms. I tried to increase the timeout by following the
> Flink configuration documentation. Specifically I did the following:
>
> 1) Passed a configuration to the Flink execution environment with
> `akka.ask.timeout` set to a higher value. I started this in Intellij.
> 2) Passed program arguments via the run configuration in Intellij,
> e.g. `-Dakka.ask.timeout:100s`
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local
> standalone cluster via start-cluster.sh. The setting is reflected in
> Flink's web interface.
>
> However - despite explicit configuration the default setting seems to be
> used. The exception below states in each case that akka ask timed out after
> 1ms.
>
> As my problem seems very basic I do not include an SSCCE for now but I can
> try to build one if this helps figuring out the issue.
>
> --
> *[...]*
> *Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
> JobResult.*
> *[...]*
> * at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)*
> * at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)*
> * at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)*
> * at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)*
> * at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)*
> *[...]*
> *Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
> after [1 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".*
> * at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)*
> * at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)*
> * at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)*
> * at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)*
> * at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)*
> * at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)*
> * at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)*
> * at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)*
> * at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)*
> * at java.lang.Thread.run(Thread.java:745)*
> *[...]*
> *--*
>
>
> Best regards and thanks for your help,
> Lukas
>
>
>
>


Re: Flink resource manager unable to connect to mesos after restart

2018-07-19 Thread Renjie Liu
Hi, Gary:

It can be reproduced stablely, just need to kill job manager and restart it.

Attached is jobmanager's log, but I don't find anyting valuable since it
just keep reporting unable to connect to mesos master.

On Thu, Jul 19, 2018 at 4:55 AM Gary Yao  wrote:

> Hi,
>
> If you are able to re-produce this reliably, can you post the jobmanager
> logs?
>
> Best,
> Gary
> On Wed, Jul 18, 2018 at 10:33 AM, Renjie Liu 
> wrote:
>
>> Hi, all:
>>
>> I'm testing flink 1.5.0 and find that flink mesos resource manager unable
>> to connect to mesos after restart. Have you seen this happenen?
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
> --
Liu, Renjie
Software Engineer, MVAD


RE: Keeping only latest row by key?

2018-07-19 Thread Porritt, James
Hi Timo,
Thanks for this. I’ve been looking into creating this in Java 
by looking at MaxAggFunction.scala as a basis. Is it correct that I’d be 
creating a version for each type I want to use it with (albeit using Generic s) 
and registering the functions separately for use with the correct type of table 
field?

Thanks,
James.

From: Timo Walther 
Sent: 18 July 2018 12:21
To: Porritt, James 
Cc: user@flink.apache.org
Subject: Re: Keeping only latest row by key?

Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE 
aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
It should be possible to implement this behaviour using them.

Cheers,
Andrey


On 17 Jul 2018, at 18:27, Porritt, James 
mailto:james.porr...@uk.mlp.com>> wrote:

In Spark if I want to be able to get a set of unique rows by id, using the 
criteria of keeping the row with the latest timestamp, I would do the following:

.withColumn("rn",
F.row_number().over(
Window.partitionBy(‘id’) \
.orderBy(F.col('timestamp').desc())
)
) \
.where(F.col("rn") == 1)

I see Flink has windowing functionality, but I don’t see it has row 
enumeration? How best in that case would I achieve the above?

Thanks,
James.
##
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
##



##

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

##


Re: Production readiness of Flink Job Stop Service

2018-07-19 Thread Fabian Hueske
Hi Chirag,

Stop with savepoint is not mentioned in the 1.5.0 release notes [1].
Since its a frequently requested feature, I'm pretty sure that it would
have been mentioned if it was added.

Best, Fabian

[1] http://flink.apache.org/news/2018/05/25/release-1.5.0.html


2018-07-19 8:39 GMT+02:00 vino yang :

> Hi Chirag,
>
> Did you read the latest stable Flink documentation about Savepoint[1] and
> Cancel with savepoint[2] and Upgrade application[3]?
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/ops/state/savepoints.html#resuming-from-savepoints
> [2]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/monitoring/rest_api.html#cancel-job-with-savepoint
> [3]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/ops/upgrading.html
>
> Thanks, vino.
>
>
> 2018-07-19 14:25 GMT+08:00 Chirag Dewan :
>
>> Hi,
>>
>> I am planning to use the Stop Service for stopping/resuming/pausing my
>> Flink Job. My intention is to stop sources before we take the savepoint
>> i.e. stop with savepoint.
>>
>> I know that since Flink 1.4.2, Stop is not stable/not production ready.
>>
>> With Flink 1.5 can it be used for stopping jobs?
>>
>> Regards,
>>
>> Chirag
>>
>
>


Re: Why data didn't enter the time window in EventTime mode

2018-07-19 Thread Fabian Hueske
Hi Soheil,

Hequn is right. This might be an issue with advancing event-time.
You can monitor that by checking the watermarks in the web dashboard or
print-debug it with a ProcessFunction which can lookup the current
watermark.

Best, Fabian

2018-07-19 3:30 GMT+02:00 Hequn Cheng :

> Hi Soheil,
>
> > wait 8 milliseconds (according to my code) to see if any other data
> with the same key received or not and after 8 millisecond it will be
> triggered.
> Yes, but the time is event time, so if there is no data from source the
> time won't be advanced.
>
> There are some reasons why the event time has not been advanced:
> 1. There are no data from the source
> 2. One of the source parallelisms doesn't have data
> 3. The time field, i.e, Long in Tuple3, should be millisecond instead of
> second.
> 4. Data should cover a longer time spam than the window size to advance
> the event time.
>
> Best, Hequn
>
> On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani  > wrote:
>
>> Hi,
>>
>> In a datastream processing problem, the source generated data every 8
>> millisecond and timestamp is a field of the data. In default Flink time
>> behavior data enter the time window but when I set Flink time to EventTime
>> it will output nothing! Here is the code:
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> SingleOutputStreamOperator> res = 
>> aggregatedTuple
>> .assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor> JSONObject>>(Time.milliseconds(8)) {
>>
>> @Override
>> public long extractTimestamp(Tuple3 
>> element) {
>> return element.f1 ;
>> }
>> }).keyBy(1).timeWindow(Time.milliseconds(8))
>> .allowedLateness(Time.milliseconds(3))
>> .sideOutputLateData(lateOutputTag)
>> .reduce(processing...);
>> DataStream> lateData = 
>> res.getSideOutput(lateOutputTag);
>> res.print();
>>
>> What is the problem with my code?
>> According to the Flink documents, my understanding about EventTime is
>> that for example in case of time window when a new data received it start a
>> new (logical window) based on new data event timestamp and wait 8
>> milliseconds (according to my code) to see if any other data with the same
>> key received or not and after 8 millisecond (from timestamp of the first
>> element of the window) it will be triggered. Since data source generated
>> data in a constant periodic interval, I set a watermarck of  8, too. Is my
>> understanding about Flink window in EventTime correct?
>>
>
>


Re: Race between window assignment and same window timeout

2018-07-19 Thread Fabian Hueske
Hi Shay,

This sounds very much like the off-by-one bug described by FLINK-9857 [1].
The problem was identified in another recent user ml thread and fixed for
Flink 1.5.2 and 1.6.0.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9857

2018-07-18 19:00 GMT+02:00 Andrey Zagrebin :

> Hi Shay,
>
> I would suggest to try Allowed Lateness, like you mention 500 ms:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/
> operators/windows.html#allowed-lateness
> It might also work for processing time.
>
> Cheers,
> Andrey
>
> On 18 Jul 2018, at 17:22, Shimony, Shay  wrote:
>
> Hi,
>
> It seems like we encounter a race situation between the aggregation thread
> and the Time Trigger thread.
> It might not be a bug, but it still seems strange to us, and we would like
> your help to fix it/work around it please.
>
> First, few descriptions about our use case and system:
> · We are working with processing time.
> · We are using Flink 1.4.
> · We use our customized sliding window of size 1 minute, slide 10
> seconds.
> But we think it can happen also in tumbling window. So for simplicity,
> let’s assume tumbling window of 1 minute.
> · Our window Trigger does FIRE upon each element.
> · We have constant 2k/sec incoming messages, balanced rate.
> · When I say “window state” I mean simply our aggregation value
> in it.
>
> If the timestamp of an element is very close to the end of the window,
> then it will be assigned with that window of course, but it occasionally
> happen that this window is timing out and cleared – before this element is
> aggregated with the window state, thus we lost the previous aggregation
> value and got new aggregation state with the element value.
>
> Below is the story as seen by the threads.
> Timestamps are logical.
>
> Suppose we are in the beginning of WindowOperator.processElement.
> Current time: 119 (nearly 120)
>
> *Reducer thread*
> *Time Trigger thread*
> Assign element to window [60, 120],
>
> because context.getCurrentProcessingTime()
>
> Returned 119 (in assignWindows)
>
>
>
> Time is 120 à clear window state
> Add the element value to window state [60, 120] (it starts from new state)
>
>
> Our questions:
> 1.   Is it a legitimate race? (We expected that (1) assigning element
> to a window + aggregating it to its state, and (2) clearing the window –
> would be atomic to each other – that is, if an element is valid for a
> window, then it will be assigned to it and aggregated fully into its state,
> and only then window clear can happen).
> 2.   How could we make the Time Trigger thread wait a little bit with
> the window cleaning? Like adding 500ms to clean window time schedule.
> We thought to override WindowOperator.cleanupTime, so is it possible to
> easily replace WindowOperator with ours?
> 3.   Maybe you have different idea to work around it?
>
> Thanks!
> Shay
>
>
>