SQL Do Not Support Custom Trigger

2018-06-21 Thread YennieChen88
I found that flink SQL use the specific default trigger, which will not
triggered until the window closes. But sometimes, we need to trigger before
window closes. As the class *WindowAssigner *provides method
*getDefaultTrigger *with parameter *StreamExecutionEnvironment*, how about
passing a custom trigger to *WindowAssigner *by *StreamExecutionEnvironment
*?
If we can do this, SQL can support custom triggers. It is easy to
implement. All we need to do is just add new variables named like
*defaultTimeWindowTrigger *and *defaultGlobalWindowTrigger
*to*StreamExecutionEnvironment*, which can be set by public setter method.
Then *WindowAssigner *could get the *defaultWindowTrigger *or
*defaultGlobalWindowTrigger *from *StreamExecutionEnvironment *by
*getDefaultTrigger *method. 

Codes :
*StreamExecutionEnvironment*:
/** The default trigger used for creating a time window */
private Trigger defaultTimeWindowTrigger;

/** The default trigger used for creating a global window */
private Trigger defaultGlobalWindowTrigger;
/**
 * Get default trigger of time window
 * @return
 */
public Trigger getDefaultTimeWindowTrigger() {
return defaultTimeWindowTrigger; }

/**
 * Set default trigger of time window
 * @param defaultTimeWindowTrigger
 */
public void setDefaultTimeWindowTrigger(Trigger
defaultTimeWindowTrigger) {
this.defaultTimeWindowTrigger = defaultTimeWindowTrigger;
}

/**
 * Get default trigger of global window
 * @return
 */
public Trigger getDefaultGlobalWindowTrigger() {
return defaultGlobalWindowTrigger; }

/**
 * Set default trigger of global window
 * @param defaultGlobalWindowTrigger
 */
public void setDefaultGlobalWindowTrigger(Trigger
defaultGlobalWindowTrigger) {
this.defaultGlobalWindowTrigger = defaultGlobalWindowTrigger;
}

*TumblingEventTimeWindows/ TumblingProcessingTimeWindows/…*
   @Override
public Trigger
getDefaultTrigger(StreamExecutionEnvironment env) {
// Get default trigger from StreamExecutionEnvironment
Trigger defaultTrigger =
env.getDefaultTimeWindowTrigger();
if (defaultTrigger != null) {
return defaultTrigger;
}
return EventTimeTrigger.create();
}

*GlobalWindows*:
@Override
public Trigger
getDefaultTrigger(StreamExecutionEnvironment env) {
// Get default trigger from StreamExecutionEnvironment
Trigger defaultTrigger =
env.getDefaultGlobalWindowTrigger();
if (defaultTrigger != null) {
return defaultTrigger;
}

return new NeverTrigger();
}


*Look forward to your comments. I would really appreciate taking the time to
help me think about this.*





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need a map-like state in an operator state

2018-06-21 Thread xsheng
Solved it by using a key selector that returns a constant, so that creates a
"pseudo" keyedStream with only one partition. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Need a map-like state in an operator state

2018-06-21 Thread xsheng
Hi All,

I'm sorry if I'm double posting, but I posted it before subscribing and I
don't see it in my post lists. So I'm posting it again. 

The Flink app we are trying to build is as such: read from kafka, sort the
messages according to some dependency rules, and only send messages that
have satisfied all dependency requirements. I will need set-like and
map-like states to look up which messages have been published. The problem
is, it's not a keyed stream where map-like states are applicable. It seems
like we are stuck with operator states which seem to only support list-like
states. I guess I can use a ListState[Map[A,B]] but I don't think it's
possible to merge the maps in the ListState to allow for quick look-ups. 

Can anyone help please?

Xianghai



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Actually, random thought, could yarn preemption be causing this?  What is
the failure scenario should a working task manager go down in yarn that is
doing real work?  The docs make it sound like it should fire up another TM
and get back to work out of the box, but I'm not seeing that.


On Thu, Jun 21, 2018 at 1:20 PM Garrett Barton 
wrote:

> Thank you all for the reply!
>
> I am running batch jobs, I read in a handful of files from HDFS and output
> to HBase, HDFS, and Kafka.  I run into this when I have partial usage of
> the cluster as the job runs.  So right now I spin up 20 nodes with 3 slots,
> my job at peak uses all 60 slots, but by the end of it since my outputs are
> all forced parallel 1 while I work out kinks, that all typically ends up
> running in 1 or two task managers tops.  The other 18-19 task managers die
> off.  Problem is as soon as any task manager dies off, my client throws the
> above exception and the job fails.
>
> I cannot share logs, but I was thinking about writing a dirt simple
> mapreduce flow based on the wordcount example.  The example would have a
> wide map phase that generates data, and then I'd run it through a reducer
> that sleeps maybe 1 second every record.  I believe that will simulate my
> condition very well where I go from 100% used slots to only 1-2 used slots
> as I hit that timeout.  I'll do that today and let you know, if it works I
> can share the code in here as an example.
>
> On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann 
> wrote:
>
>> Hi Garrett,
>>
>> killing of idle TaskManager should not affect the execution of the job.
>> By definition a TaskManager only idles if it does not execute any tasks.
>> Could you maybe share the complete logs (of the cluster entrypoint and all
>> TaskManagers) with us?
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske  wrote:
>>
>>> Hi Garrett,
>>>
>>> I agree, there seems to be an issue and increasing the timeout should
>>> not be the right approach to solve it.
>>> Are you running streaming or batch jobs, i.e., do some of the tasks
>>> finish much earlier than others?
>>>
>>> I'm adding Till to this thread who's very familiar with scheduling and
>>> process communication.
>>>
>>> Best, Fabian
>>>
>>> 2018-06-19 0:03 GMT+02:00 Garrett Barton :
>>>
 Hey all,

  My jobs that I am trying to write in Flink 1.5 are failing after a few
 minutes.  I think its because the idle task managers are shutting down,
 which seems to kill the client and the running job. The running job itself
 was still going on one of the other task managers.  I get:

 org.apache.flink.client.program.ProgramInvocationException:
 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
 Connection unexpectedly closed by remote task manager ''. This might
 indicate that the remote task manager was lost.
 at org.apache.flink.runtime.io
 .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)

 Now I happen to have the last part of the flow paralleled to 1 right
 now for debugging, so the 4 task managers that are spun up, 3 of them hit
 the timeout period (currently set to 24).  I think as soon as the first
 one goes the client throws up and the whole job dies as a result.

  Is this expected behavior and if so, is there another way around it?
 Do I keep increasing the slotmanager.taskmanager-timeout to a really really
 large number? I have verified setting the timeout to 84 lets the job
 complete without error.

 Thank you!

>>>
>>>


Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Thank you all for the reply!

I am running batch jobs, I read in a handful of files from HDFS and output
to HBase, HDFS, and Kafka.  I run into this when I have partial usage of
the cluster as the job runs.  So right now I spin up 20 nodes with 3 slots,
my job at peak uses all 60 slots, but by the end of it since my outputs are
all forced parallel 1 while I work out kinks, that all typically ends up
running in 1 or two task managers tops.  The other 18-19 task managers die
off.  Problem is as soon as any task manager dies off, my client throws the
above exception and the job fails.

I cannot share logs, but I was thinking about writing a dirt simple
mapreduce flow based on the wordcount example.  The example would have a
wide map phase that generates data, and then I'd run it through a reducer
that sleeps maybe 1 second every record.  I believe that will simulate my
condition very well where I go from 100% used slots to only 1-2 used slots
as I hit that timeout.  I'll do that today and let you know, if it works I
can share the code in here as an example.

On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann  wrote:

> Hi Garrett,
>
> killing of idle TaskManager should not affect the execution of the job. By
> definition a TaskManager only idles if it does not execute any tasks. Could
> you maybe share the complete logs (of the cluster entrypoint and all
> TaskManagers) with us?
>
> Cheers,
> Till
>
> On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske  wrote:
>
>> Hi Garrett,
>>
>> I agree, there seems to be an issue and increasing the timeout should not
>> be the right approach to solve it.
>> Are you running streaming or batch jobs, i.e., do some of the tasks
>> finish much earlier than others?
>>
>> I'm adding Till to this thread who's very familiar with scheduling and
>> process communication.
>>
>> Best, Fabian
>>
>> 2018-06-19 0:03 GMT+02:00 Garrett Barton :
>>
>>> Hey all,
>>>
>>>  My jobs that I am trying to write in Flink 1.5 are failing after a few
>>> minutes.  I think its because the idle task managers are shutting down,
>>> which seems to kill the client and the running job. The running job itself
>>> was still going on one of the other task managers.  I get:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException:
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connection unexpectedly closed by remote task manager ''. This might
>>> indicate that the remote task manager was lost.
>>> at org.apache.flink.runtime.io
>>> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
>>>
>>> Now I happen to have the last part of the flow paralleled to 1 right now
>>> for debugging, so the 4 task managers that are spun up, 3 of them hit the
>>> timeout period (currently set to 24).  I think as soon as the first one
>>> goes the client throws up and the whole job dies as a result.
>>>
>>>  Is this expected behavior and if so, is there another way around it? Do
>>> I keep increasing the slotmanager.taskmanager-timeout to a really really
>>> large number? I have verified setting the timeout to 84 lets the job
>>> complete without error.
>>>
>>> Thank you!
>>>
>>
>>


Re: How to set log level using Flink Docker Image

2018-06-21 Thread Guilherme Nobre
Thanks, that sounds reasonable. I'll try it out tomorrow :)

Cheers,
G

On Thu, Jun 21, 2018 at 6:56 PM Dominik Wosiński  wrote:

> You can for example mount the *conf* directory using docker volumes.
>
> You would need to have *logback.xml* and then mount it as *conf/logback.xml
> *inside the image. Locally You could do this using *docker-compose.yml*,
> for mounting volumes in kubernetes refer to this page:
> https://kubernetes.io/docs/concepts/storage/volumes/
>
> Regards.
>
> Czw., 21.06.2018, 18:42 użytkownik Guilherme Nobre <
> guilhermenob...@gmail.com> napisał:
>
>> Hi everyone,
>>
>> I have a Flink Cluster created from Flink's Docker Image. I'm trying to
>> set log level to DEBUG but couldn't find how. I know where logback
>> configuration files are as per documentation:
>>
>> "The conf directory contains a logback.xml file which can be modified and
>> is used if Flink is started outside of an IDE."
>>
>> However I'm not sure how to do that for a running Flink Cluster on
>> Kubernetes.
>>
>> Thanks,
>> G
>>
>>


Re: How to set log level using Flink Docker Image

2018-06-21 Thread Dominik Wosiński
You can for example mount the *conf* directory using docker volumes.

You would need to have *logback.xml* and then mount it as *conf/logback.xml
*inside the image. Locally You could do this using *docker-compose.yml*,
for mounting volumes in kubernetes refer to this page:
https://kubernetes.io/docs/concepts/storage/volumes/

Regards.

Czw., 21.06.2018, 18:42 użytkownik Guilherme Nobre <
guilhermenob...@gmail.com> napisał:

> Hi everyone,
>
> I have a Flink Cluster created from Flink's Docker Image. I'm trying to
> set log level to DEBUG but couldn't find how. I know where logback
> configuration files are as per documentation:
>
> "The conf directory contains a logback.xml file which can be modified and
> is used if Flink is started outside of an IDE."
>
> However I'm not sure how to do that for a running Flink Cluster on
> Kubernetes.
>
> Thanks,
> G
>
>


How to set log level using Flink Docker Image

2018-06-21 Thread Guilherme Nobre
Hi everyone,

I have a Flink Cluster created from Flink's Docker Image. I'm trying to set
log level to DEBUG but couldn't find how. I know where logback
configuration files are as per documentation:

"The conf directory contains a logback.xml file which can be modified and
is used if Flink is started outside of an IDE."

However I'm not sure how to do that for a running Flink Cluster on
Kubernetes.

Thanks,
G


Re: # of active session windows of a streaming job

2018-06-21 Thread Dongwon Kim
Hi Fabian and Chesnay,

Thank you guys.

Fabian : Unfortunately, as Chesnay said, MetricGroup doesn't allow for
ProcessWindowFunction to access to a counter defined in Trigger.
Chesnay : I'm going to follow your advice on how to modify Flink. Thank you
very much!

Best,

- Dongwon

On Thu, Jun 21, 2018 at 10:26 PM, Chesnay Schepler 
wrote:

> Without modifications to Flink? No. By design nothing can intercept or
> retrieve metrics with the metrics API.
> For this pattern the usual recommendation is to explicitly pass the metric
> to components that require it.
>
> If modifications are an option, what you could do is
> * define a counter in the OperatorIOMetricGroup
> * have the operator checkpoint/restore the counter,
> * access it in the trigger by casting your way through the MetricGroups to
> an OperatorMetricGroup from which you can retrieve the
> OperatorIOMetricGroup.
>
>
>
> On 21.06.2018 11:16, Fabian Hueske wrote:
>
> Hi Dongwon,
>
> Yes, the counter state should be stored in operator state which is not
> available on Triggers.
> Chesnay: Can a window function (like ProcessWindowFunction) access (read,
> write) the counter of its associated Trigger to checkpoint and restore it?
>
> Best, Fabian
>
> 2018-06-20 16:59 GMT+02:00 Dongwon Kim :
>
>> Hi Fabian and Chesnay,
>>
>> As Chesnay pointed out, it seems that I need to write the current counter
>> (which is defined inside Trigger) into state which I think should be the
>> operator state of the window operator.
>> However, as I previously said, TriggerContext allows for users to access
>> only the partitioned state that are scoped to *the key and* *the window* of
>> the current Trigger invocation.
>> There's no way for me to access to the operator state of the window
>> operator through TriggerContext.
>> The partitioned state doesn't seem suitable as we have more than *ten
>> million keys*.
>> This amount of keys could possibly break down the metric system and the
>> external metric systems like Ganglia and Prometheus.
>>
>> What I want the most is to achieve the goal using the current API (I'm
>> using Flink-1.4.2) without modification.
>> But a change in TriggerContext seems unavoidable because it has to expose
>> an additional method for users like me to access to the operator state of
>> the window operator.
>>
>> Thank you guys for the useful discussion.
>>
>> p.s. Fabian, yes you're right. It is Trigger.clear(), not
>> Trigger.onClose().
>>
>> Best,
>> - Dongwon
>>
>>
>> On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler 
>> wrote:
>>
>>> Checkpointing of metrics is a manual process.
>>> The operator must write the current value into state, retrieve it on
>>> restore and restore the counter's count.
>>>
>>>
>>> On 20.06.2018 12:10, Fabian Hueske wrote:
>>>
>>> Hi Dongwon,
>>>
>>> You are of course right! We need to decrement the counter when the
>>> window is closed.
>>>
>>> The idea of using Trigger.clear() (the clean up method is called clear()
>>> instead of onClose()) method is great!
>>> It will be called when the window is closed but also when it is merged.
>>> So, I think you are right and we only need to increment the counter in
>>> Trigger.onElement() and decrement in Trigger.clear().
>>>
>>> I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay
>>> (in CC) would know that.
>>> Not sure what would be the best approach if you need a fault tolerant
>>> solution.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>>
>>> 2018-06-19 16:38 GMT+02:00 Dongwon Kim :
>>>
 Hi Fabian,
 Thanks a lot for your reply.

 Do you need to number of active session windows as a DataStream or
> would you like to have it as a metric that you can expose.
> I possible, I would recommend to expose it as a metric because they
> are usually easier to collect.

 I want to have it as a metric and it doesn't look difficult thanks to
 the metric system exposed by TriggerContext.

 In order to track how many session windows exist, we would need to
> increment a counter by one when a new window is created (or an element is
> assigned to a window, which is equivalent for session windows)

 I agree with you that we need to increment a counter when
 Trigger.onElement() is called due to the characteristic of session windows.

 and decrement the counter when windows are merged by the number of
> merged windows minus one.

 You decrement the counter when windows are merged, but I think we need
 to decrement the counter when a window is expired as well.

 However, decrementing the counter is difficult. Although the
> Trigger.onMerge() method is called, it does not know how many windows were
> merged (which is done by the WindowAssigner) and only sees the merged
> window.

 We assume that timestamps of records from a user are in ascending
 order, so only one window is closed at a time which simplifies the problem
 of how to decrement the counter.

Re: Backpressure from producer with flink connector kinesis 1.4.2

2018-06-21 Thread Liu, Gavin (CAI - Atlanta)
Thanks, Gordon. Glad to hear you confirm on this. I learned a lot from the open 
pr btw.

I wonder except adding back pressure support in the producer, is there any 
other way to protect yarn from crashing, e.g., through configuration?

From: "Tzu-Li (Gordon) Tai" 
Date: Thursday, June 21, 2018 at 5:09 AM
To: "Liu, Gavin (CAI - Atlanta)" , 
"user@flink.apache.org" 
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

Hi,

According to the description in [1], then yes, I think it is expected that 
eventually YARN containers running TMs that execute the producer sink subtasks 
will be killed due to memory problems.
It seems like that KPL client is only a wrapper around a C++ daemon process, so 
it actually wouldn’t be the TM Java processes running out of memory and 
throwing OOMs, which would explain why the TMs were apparently silently killed 
as shown by the exception you pasted.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/6021

On 21 June 2018 at 5:56:02 AM, Liu, Gavin (CAI - Atlanta) 
(gavin@coxautoinc.com) wrote:
Hi guys,

I have another question related to the KPL problem. I wonder what the 
consequences of overwhelming KPL internal queue (kinesis) can be.

From my observation in experimenting with 1.4.2 (which does not have 
backpressure support yet in the open pr stated below), when the flink cluster 
is processing too fast and the throughput on the sink kinesis stream is 
limited, i.e, the throughput exceeding exception starts to be thrown, we quite 
often get the following exception (pasted in the end) very soon and all the 
subtasks switching status to cancelling and restarted.
From the exception trace, I can see that yarn got shutdown and all task 
managers are terminated. I suspect it is because of the memory issue. Whenever 
the throughput exceeding exception is thrown, it implicitly means that the 
internal unbounded queue in KPL may grow rapidly. we set the recordTtl = 60s 
and we can still see the record expiration exception along with exceeded 
throughput exception. Which leads me to wonder that if the internal unbounded 
queue grows too large and exhaust all the memory in the node and eventually 
crashing the yarn and the job manager.

Well, This is just my hypothesis. I wonder if someone has already encountered 
or investigated similar issues and could shed some light on it.




java.lang.Exception: TaskManager was lost/killed: 
container_1529095945616_0009_01_04 @ ip-172-31-64-249.ec2.internal 
(dataPort=44591)
at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.ja

Re: # of active session windows of a streaming job

2018-06-21 Thread Chesnay Schepler
Without modifications to Flink? No. By design nothing can intercept or 
retrieve metrics with the metrics API.
For this pattern the usual recommendation is to explicitly pass the 
metric to components that require it.


If modifications are an option, what you could do is
* define a counter in the OperatorIOMetricGroup
* have the operator checkpoint/restore the counter,
* access it in the trigger by casting your way through the MetricGroups 
to an OperatorMetricGroup from which you can retrieve the 
OperatorIOMetricGroup.



On 21.06.2018 11:16, Fabian Hueske wrote:

Hi Dongwon,

Yes, the counter state should be stored in operator state which is not 
available on Triggers.
Chesnay: Can a window function (like ProcessWindowFunction) access 
(read, write) the counter of its associated Trigger to checkpoint and 
restore it?


Best, Fabian

2018-06-20 16:59 GMT+02:00 Dongwon Kim >:


Hi Fabian and Chesnay,

As Chesnay pointed out, it seems that I need to write the current
counter (which is defined inside Trigger) into state which I think
should be the operator state of the window operator.
However, as I previously said, TriggerContext allows for users to
access only the partitioned state that are scoped to *the key and*
*the window* of the current Trigger invocation.
There's no way for me to access to the operator state of the
window operator through TriggerContext.
The partitioned state doesn't seem suitable as we have more than
*ten million keys*.
This amount of keys could possibly break down the metric system
and the external metric systems like Ganglia and Prometheus.

What I want the most is to achieve the goal using the current API
(I'm using Flink-1.4.2) without modification.
But a change in TriggerContext seems unavoidable because it has to
expose an additional method for users like me to access to the
operator state of the window operator.

Thank you guys for the useful discussion.

p.s. Fabian, yes you're right. It is Trigger.clear(), not
Trigger.onClose().

Best,
- Dongwon


On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Checkpointing of metrics is a manual process.
The operator must write the current value into state, retrieve
it on restore and restore the counter's count.


On 20.06.2018 12:10, Fabian Hueske wrote:

Hi Dongwon,

You are of course right! We need to decrement the counter
when the window is closed.

The idea of using Trigger.clear() (the clean up method is
called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it
is merged.
So, I think you are right and we only need to increment the
counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed.
Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault
tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim mailto:eastcirc...@gmail.com>>:

Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a
DataStream or would you like to have it as a metric
that you can expose.
I possible, I would recommend to expose it as a
metric because they are usually easier to collect.

I want to have it as a metric and it doesn't look
difficult thanks to the metric system exposed by
TriggerContext.

In order to track how many session windows exist, we
would need to increment a counter by one when a new
window is created (or an element is assigned to a
window, which is equivalent for session windows)

I agree with you that we need to increment a counter when
Trigger.onElement() is called due to the characteristic
of session windows.

and decrement the counter when windows are merged by
the number of merged windows minus one.

You decrement the counter when windows are merged, but I
think we need to decrement the counter when a window is
expired as well.

However, decrementing the counter is difficult.
Although the Trigger.onMerge() method is called, it
does not know how many windows were merged (which is
done by the WindowAssigner) and only sees the merged
window.

We assume that timestamps of records from a user are in
ascending order, so only one window is closed at a time
which simplifies the problem of how to d

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-21 Thread ashish pok
Hi Stefan, 
Thanks for outlining the steps and are similar to what we have been doing for 
OOM issues.
However, I was looking for something more high level on whether state / key 
handling needs some sort of cleanup specifically that is not done by default. I 
am about 99% (nothing is certain:)) sure that if I switch this app to a lower 
lever API like Process Function and manage my own state and timers, I will not 
see this issue. When I had same issue in the past it was for Global Window and 
Fabian point d out that new keys are constantly being created. I built a simple 
Process Function for that and issue went away. I think your first statement 
sort of hints that as well. So let me hone in on that. I am processing a time 
series data for network elements. Keys are 10 mins floor of event time concat 
with element ID. Idea here was to create 10 min buckets of data with windows 
that start with first event in that bucket and fire when no events arrive for 
12 or so minutes.So new keys are definitely being created. So,
1- Am I adding to the memory constantly by doing that? Sounds like it based on 
your comments.2- If so, whats the way to clear those keys when windows fire if 
any?3- It seems like a very simple use case, so now I am wondering if I am even 
using the right high level API?
Thanks, Ashish


Sent from Yahoo Mail for iPhone


On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter 
 wrote:

Hi,
it is possible that the number of processing time timers can grow, because 
internal timers are scoped by time, key, and namespace (typically this means 
„window“, because each key can be part of multiple windows). So if the number 
of keys in your application is steadily growing this can happen. 
To analyse the heap dump, I usually take the following approach:- obviously 
include only reachable objects. If dumps are very big, try limit the size or to 
trigger the OOM earlier by configuring a lower heap size. It should still give 
you the problematic object accumulation, if there is one.- like at the 
statistics of „heavy hitter“ classes, i.e. classes for which the instances 
contribute the most to the overall heap consumption. Sometimes this will show 
you classes that are also part of classes that rank higher up, e.g. 1st place 
could be string, and second place char[]. But you can figure that out in the 
next step.- explore the instances of the top heavy hitter class(es). If there 
is a leak, if you just randomly sample into some objects, the likelihood is 
usually *very* high that you catch an object that is part of the leak (as 
determined in the next step). Otherwise just repeat and sample another object.- 
inspect the object instance and follow the reference links to the parent 
objects in the object graph that hold a reference to the leak object candidate. 
You will typically end up in some array where the leak accumulates. Inspect the 
object holding references to the leaking objects. You can see the field values 
and this can help to determine if the collection of objects is justified or if 
data is actually leaking. So in your case, you can start from some 
InternalTimer or Window object, backwards through the reference chain to see 
what class is holding onto them and why (e.g. should they already be gone 
w.r.t. to their timestamp). Walking through the references should be supported 
by all major heap analysis tools, including JVisualVM that comes with your JDK. 
You can also use OQL[1] to query for timers or windows that should already be 
gone.
Overall I think it could at least be helpful to see the statistics for heavy 
hitter classes and screenshots of representative reference chains to objects to 
figure out the problem cause. If it is not possible to share heap dumps, 
unfortunately I think giving you this strategy is currently the best I can 
offer to help. 
Best,Stefan


[1] https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql

Am 20.06.2018 um 02:33 schrieb ashish pok :
 All, 
I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, 
they are 5GB to 8GB. I did some compares and what I can see is heap shows data 
tuples (basically instances of object that is maintained as states) counts 
going up slowly. 
Only thing I could possibly relate that to were 
streaming.api.operators.InternalTimer and 
streaming.api.windowing.windows.TimeWindow both were trending up as well. There 
are definitely lot more windows created than the increments I could notice but 
nevertheless those objects are trending up. Input stream has a very consistent 
sin wave throughput. So it really doesn't make sense for windows and tuples to 
keep trending up. There is also no event storm or anything of that sort (ie. 
source stream has been very steady as far as throughput is concerned).
Here is a plot of heap utilization:
<1529454480422blob.jpg>
So it has a typical sin wave pattern which is definitely expected as input 
stream has the same pattern but source doesnt have a trend upw

Re: How to use broadcast variables in data stream

2018-06-21 Thread Fabian Hueske
That's correct.

Broadcast state was added with Flink 1.5. You can use
DataStream.broadcast() in Flink 1.3 but it has a few limitations.
For example, you cannot connect a keyed and a broadcasted stream.

2018-06-21 11:58 GMT+02:00 zhen li :

> Thanks for your reply.
> But  broadcast state seems not supported in Flink-1.3 .
> I found this in Flink-1.3:
> Broadcasting
> DataStream → DataStream
>
> Broadcasts elements to every partition.
>
> dataStream.broadcast();
>
> But I don’t know how to convert it to list and get it in stream context .
>
> 在 2018年6月21日,下午5:20,Fabian Hueske  写道:
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/stream/state/broadcast_state.html
>
>
>


Re: Flink 1.5 TooLongFrameException in cluster mode?

2018-06-21 Thread Chesnay Schepler
Due to the job-submission changes in 1.5 you attempted to send REST 
requests to akka, which causes the exception you received.


Instead you want to specify the REST port instead, which by default is 
8081 (configurable via rest.port).


On 21.06.2018 10:44, chrisr123 wrote:

This looks related to using the -m option on CLI:

This works:
$FLINK_HOME/bin/flink run -c $CLASS $JARFILE

but this causes the error:
$FLINK_HOME/bin/flink run -m jobmanagermachine:6123 -c $CLASS $JARFILE


I found this thread here from April 27
http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3c1524831996624-0.p...@n4.nabble.com%3E



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: How to use broadcast variables in data stream

2018-06-21 Thread zhen li
Thanks for your reply.
But  broadcast state seems not supported in Flink-1.3 .
I found this in Flink-1.3:
Broadcasting
DataStream → DataStream

Broadcasts elements to every partition.

dataStream.broadcast();


But I don’t know how to convert it to list and get it in stream context .

在 2018年6月21日,下午5:20,Fabian Hueske mailto:fhue...@gmail.com>> 
写道:

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/broadcast_state.html



Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
The reason why you still have to do it is because we still have to support
the legacy mode where the client needs to know the JobManager RPC address.
Once we remove the legacy mode, we could change the
HighAvailabilityServices such that we have client facing HA services which
only retrieve the rest server endpoint and cluster internal HA services
which need to know the cluster components address at cluster startup.

Cheers,
Till

On Thu, Jun 21, 2018 at 11:38 AM Sampath Bhat 
wrote:

> hi
> Yes I've specified the rest.address for the flink client to connect to the
> rest.address and the rest.address is valid and working fine but my question
> is why am I supposed to give jobmanager.rpc.address for flink client to
> connect to flink cluster if flink client depends only on rest.address?
>
> On Thu, Jun 21, 2018 at 12:41 PM, Till Rohrmann 
> wrote:
>
>> Hi,
>>
>> if the rest.address is different from the jobmanager.rpc.address, then
>> you should specify that in the flink-conf.yaml and Flink will connect to
>> rest.address. Only if rest.address is not specified, the system will fall
>> back to use the jobmanager.rpc.address. Currently, the rest server endpoint
>> runs in the same JVM as the cluster entrypoint and all JobMasters.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 21, 2018 at 8:46 AM Sampath Bhat 
>> wrote:
>>
>>> Hello Till
>>>
>>> Thanks for clarification. But I've few questions based on your reply.
>>>
>>> In non-HA setups we need the jobmanager.rpc.address to derive the
>>> hostname
>>> of the rest server.
>>> why is there dependency on jobmanager.rpc.address to get the hostname
>>> rest
>>> server? This holds good only for normal deployments such as on bare
>>> metal,
>>> virtual machine where flink cluster runs as another process in a machine.
>>> But if we try deploy flink on kubernetes then there could be possiblity
>>> where jobmanager.rpc.address and rest.address different from each other.
>>>
>>> So if rest.address is not provided in flink-conf.yaml then looking for
>>> jobmanager.rpc.address for deriving the hostname of rest server makes
>>> sense, but when the user has already provided the rest.address but flink
>>> still looks into jobmanager.rpc.address for getting hostname of rest
>>> server
>>> is an unwanted dependency IMO.
>>>
>>> In HA setup the rpc.address is obtained from zookeeper so user need not
>>> worry about unnecessary properties while submitting job.
>>>
>>> On Wed, Jun 20, 2018 at 1:25 PM, Till Rohrmann 
>>> wrote:
>>>
>>> > It will, but it defaults to jobmanager.rpc.address if no rest.address
>>> has
>>> > been specified.
>>> >
>>> > On Wed, Jun 20, 2018 at 9:49 AM Chesnay Schepler 
>>> > wrote:
>>> >
>>> >> Shouldn't the non-HA case be covered by rest.address?
>>> >>
>>> >> On 20.06.2018 09:40, Till Rohrmann wrote:
>>> >>
>>> >> Hi Sampath,
>>> >>
>>> >> it is no longer possible to not start the rest server endpoint by
>>> setting
>>> >> rest.port to -1. If you do this, then the cluster won't start. The
>>> comment
>>> >> in the flink-conf.yaml holds only true for the legacy mode.
>>> >>
>>> >> In non-HA setups we need the jobmanager.rpc.address to derive the
>>> >> hostname of the rest server. The jobmanager.rpc.port is no longer
>>> needed
>>> >> for the client but only for the other cluster components (TMs). When
>>> using
>>> >> the HA mode, then every address will be retrieved from ZooKeeper.
>>> >>
>>> >> I hope this clarifies things.
>>> >>
>>> >> Cheers,
>>> >> Till
>>> >>
>>> >> On Wed, Jun 20, 2018 at 9:24 AM Chesnay Schepler 
>>> >> wrote:
>>> >>
>>> >>> I was worried this might be the case.
>>> >>>
>>> >>> The rest.port handling was simply copied from the legacy web-server,
>>> >>> which explicitly allowed shutting it down.
>>> >>> It may (I'm not entirely sure) also not be necessary for all
>>> deployment
>>> >>> modes; for example if the job is baked into the job/taskmanager
>>> images.
>>> >>>
>>> >>> I'm not quite sure whether the rpc address is actually required for
>>> the
>>> >>> REST job submission, or only since we still rely partly on some
>>> legacy
>>> >>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
>>> >>>
>>> >>> > Adding on to this point you made - " the rpc address is still
>>> >>> *required *due
>>> >>> > to some technical implementations; it may be that you can set this
>>> to
>>> >>> some
>>> >>> > arbitrary value however."
>>> >>> >
>>> >>> > For job submission to happen successfully we should give specific
>>> rpc
>>> >>> > address and not any arbitrary value. If any arbitrary value is
>>> given
>>> >>> the
>>> >>> > job submission fails with the following error -
>>> >>> > org.apache.flink.client.deployment.ClusterRetrieveException:
>>> Couldn't
>>> >>> > retrieve standalone cluster
>>> >>> >  at
>>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>>> >>> retrieve(StandaloneClusterDescriptor.java:51)
>>> >>> >  at
>>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescrip

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Sampath Bhat
hi
Yes I've specified the rest.address for the flink client to connect to the
rest.address and the rest.address is valid and working fine but my question
is why am I supposed to give jobmanager.rpc.address for flink client to
connect to flink cluster if flink client depends only on rest.address?

On Thu, Jun 21, 2018 at 12:41 PM, Till Rohrmann 
wrote:

> Hi,
>
> if the rest.address is different from the jobmanager.rpc.address, then you
> should specify that in the flink-conf.yaml and Flink will connect to
> rest.address. Only if rest.address is not specified, the system will fall
> back to use the jobmanager.rpc.address. Currently, the rest server endpoint
> runs in the same JVM as the cluster entrypoint and all JobMasters.
>
> Cheers,
> Till
>
> On Thu, Jun 21, 2018 at 8:46 AM Sampath Bhat 
> wrote:
>
>> Hello Till
>>
>> Thanks for clarification. But I've few questions based on your reply.
>>
>> In non-HA setups we need the jobmanager.rpc.address to derive the hostname
>> of the rest server.
>> why is there dependency on jobmanager.rpc.address to get the hostname rest
>> server? This holds good only for normal deployments such as on bare metal,
>> virtual machine where flink cluster runs as another process in a machine.
>> But if we try deploy flink on kubernetes then there could be possiblity
>> where jobmanager.rpc.address and rest.address different from each other.
>>
>> So if rest.address is not provided in flink-conf.yaml then looking for
>> jobmanager.rpc.address for deriving the hostname of rest server makes
>> sense, but when the user has already provided the rest.address but flink
>> still looks into jobmanager.rpc.address for getting hostname of rest
>> server
>> is an unwanted dependency IMO.
>>
>> In HA setup the rpc.address is obtained from zookeeper so user need not
>> worry about unnecessary properties while submitting job.
>>
>> On Wed, Jun 20, 2018 at 1:25 PM, Till Rohrmann 
>> wrote:
>>
>> > It will, but it defaults to jobmanager.rpc.address if no rest.address
>> has
>> > been specified.
>> >
>> > On Wed, Jun 20, 2018 at 9:49 AM Chesnay Schepler 
>> > wrote:
>> >
>> >> Shouldn't the non-HA case be covered by rest.address?
>> >>
>> >> On 20.06.2018 09:40, Till Rohrmann wrote:
>> >>
>> >> Hi Sampath,
>> >>
>> >> it is no longer possible to not start the rest server endpoint by
>> setting
>> >> rest.port to -1. If you do this, then the cluster won't start. The
>> comment
>> >> in the flink-conf.yaml holds only true for the legacy mode.
>> >>
>> >> In non-HA setups we need the jobmanager.rpc.address to derive the
>> >> hostname of the rest server. The jobmanager.rpc.port is no longer
>> needed
>> >> for the client but only for the other cluster components (TMs). When
>> using
>> >> the HA mode, then every address will be retrieved from ZooKeeper.
>> >>
>> >> I hope this clarifies things.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Wed, Jun 20, 2018 at 9:24 AM Chesnay Schepler 
>> >> wrote:
>> >>
>> >>> I was worried this might be the case.
>> >>>
>> >>> The rest.port handling was simply copied from the legacy web-server,
>> >>> which explicitly allowed shutting it down.
>> >>> It may (I'm not entirely sure) also not be necessary for all
>> deployment
>> >>> modes; for example if the job is baked into the job/taskmanager
>> images.
>> >>>
>> >>> I'm not quite sure whether the rpc address is actually required for
>> the
>> >>> REST job submission, or only since we still rely partly on some legacy
>> >>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
>> >>>
>> >>> > Adding on to this point you made - " the rpc address is still
>> >>> *required *due
>> >>> > to some technical implementations; it may be that you can set this
>> to
>> >>> some
>> >>> > arbitrary value however."
>> >>> >
>> >>> > For job submission to happen successfully we should give specific
>> rpc
>> >>> > address and not any arbitrary value. If any arbitrary value is given
>> >>> the
>> >>> > job submission fails with the following error -
>> >>> > org.apache.flink.client.deployment.ClusterRetrieveException:
>> Couldn't
>> >>> > retrieve standalone cluster
>> >>> >  at
>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>> >>> retrieve(StandaloneClusterDescriptor.java:51)
>> >>> >  at
>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>> >>> retrieve(StandaloneClusterDescriptor.java:31)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.runProgram(
>> >>> CliFrontend.java:249)
>> >>> >  at org.apache.flink.client.cli.
>> CliFrontend.run(CliFrontend.
>> >>> java:210)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
>> >>> CliFrontend.java:1020)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
>> >>> CliFrontend.java:1096)
>> >>> >  at
>> >>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
>> >>> NoOpSecurityContext.java:30)

Re: How to use broadcast variables in data stream

2018-06-21 Thread Fabian Hueske
Hi,

if the list is static and not too large, you can pass it as a parameter to
the function.
Function objects are serialized (using Java's default serialization) and
shipped to the workers for execution.

If the data is dynamic, you might want to have a look at Broadcast state
[1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/broadcast_state.html

2018-06-21 4:38 GMT+02:00 zhen li :

> Hi,all:
> I want to use some other broadcast  resources such as list or map  in
> the flatmap function or customer triggers, but I don’t find some api to
> satisfy.
> Anyone can help?
> thanks
>
>
>
>


Re: # of active session windows of a streaming job

2018-06-21 Thread Fabian Hueske
Hi Dongwon,

Yes, the counter state should be stored in operator state which is not
available on Triggers.
Chesnay: Can a window function (like ProcessWindowFunction) access (read,
write) the counter of its associated Trigger to checkpoint and restore it?

Best, Fabian

2018-06-20 16:59 GMT+02:00 Dongwon Kim :

> Hi Fabian and Chesnay,
>
> As Chesnay pointed out, it seems that I need to write the current counter
> (which is defined inside Trigger) into state which I think should be the
> operator state of the window operator.
> However, as I previously said, TriggerContext allows for users to access
> only the partitioned state that are scoped to *the key and* *the window* of
> the current Trigger invocation.
> There's no way for me to access to the operator state of the window
> operator through TriggerContext.
> The partitioned state doesn't seem suitable as we have more than *ten
> million keys*.
> This amount of keys could possibly break down the metric system and the
> external metric systems like Ganglia and Prometheus.
>
> What I want the most is to achieve the goal using the current API (I'm
> using Flink-1.4.2) without modification.
> But a change in TriggerContext seems unavoidable because it has to expose
> an additional method for users like me to access to the operator state of
> the window operator.
>
> Thank you guys for the useful discussion.
>
> p.s. Fabian, yes you're right. It is Trigger.clear(), not
> Trigger.onClose().
>
> Best,
> - Dongwon
>
>
> On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler 
> wrote:
>
>> Checkpointing of metrics is a manual process.
>> The operator must write the current value into state, retrieve it on
>> restore and restore the counter's count.
>>
>>
>> On 20.06.2018 12:10, Fabian Hueske wrote:
>>
>> Hi Dongwon,
>>
>> You are of course right! We need to decrement the counter when the window
>> is closed.
>>
>> The idea of using Trigger.clear() (the clean up method is called clear()
>> instead of onClose()) method is great!
>> It will be called when the window is closed but also when it is merged.
>> So, I think you are right and we only need to increment the counter in
>> Trigger.onElement() and decrement in Trigger.clear().
>>
>> I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in
>> CC) would know that.
>> Not sure what would be the best approach if you need a fault tolerant
>> solution.
>>
>> Best, Fabian
>>
>>
>>
>>
>> 2018-06-19 16:38 GMT+02:00 Dongwon Kim :
>>
>>> Hi Fabian,
>>> Thanks a lot for your reply.
>>>
>>> Do you need to number of active session windows as a DataStream or would
 you like to have it as a metric that you can expose.
 I possible, I would recommend to expose it as a metric because they are
 usually easier to collect.
>>>
>>> I want to have it as a metric and it doesn't look difficult thanks to
>>> the metric system exposed by TriggerContext.
>>>
>>> In order to track how many session windows exist, we would need to
 increment a counter by one when a new window is created (or an element is
 assigned to a window, which is equivalent for session windows)
>>>
>>> I agree with you that we need to increment a counter when
>>> Trigger.onElement() is called due to the characteristic of session windows.
>>>
>>> and decrement the counter when windows are merged by the number of
 merged windows minus one.
>>>
>>> You decrement the counter when windows are merged, but I think we need
>>> to decrement the counter when a window is expired as well.
>>>
>>> However, decrementing the counter is difficult. Although the
 Trigger.onMerge() method is called, it does not know how many windows were
 merged (which is done by the WindowAssigner) and only sees the merged
 window.
>>>
>>> We assume that timestamps of records from a user are in ascending order,
>>> so only one window is closed at a time which simplifies the problem of how
>>> to decrement the counter.
>>> Nevertheless, I think I need to decrement the counter in
>>> Trigger.onClose(), not Trigger.onMerge().
>>> By doing that in Trigger.onClose(), we can take care of both cases: when
>>> a window is merged and when a window is expired.
>>> How do you think about it?
>>>
>>> The reason I mention state is to calculate the exact number of active
>>> sessions even after my Flink application is restarted from checkpoints or
>>> savepoints.
>>> If we restore from a savepoint and the counter is initialized to 0,
>>> we'll see an incorrect value from a dashboard.
>>> This is the biggest concern of mine at this point.
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>>
>>> On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi Dongwon,

 Do you need to number of active session windows as a DataStream or
 would you like to have it as a metric that you can expose.
 I possible, I would recommend to expose it as a metric because they are
 usually easier to collect.

 SessionWindows work internally as follows:
 - ever

Re: Backpressure from producer with flink connector kinesis 1.4.2

2018-06-21 Thread Tzu-Li (Gordon) Tai
Hi,

According to the description in [1], then yes, I think it is expected that 
eventually YARN containers running TMs that execute the producer sink subtasks 
will be killed due to memory problems.
It seems like that KPL client is only a wrapper around a C++ daemon process, so 
it actually wouldn’t be the TM Java processes running out of memory and 
throwing OOMs, which would explain why the TMs were apparently silently killed 
as shown by the exception you pasted.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/6021
On 21 June 2018 at 5:56:02 AM, Liu, Gavin (CAI - Atlanta) 
(gavin@coxautoinc.com) wrote:

Hi guys,

 

I have another question related to the KPL problem. I wonder what the 
consequences of overwhelming KPL internal queue (kinesis) can be.

 

From my observation in experimenting with 1.4.2 (which does not have 
backpressure support yet in the open pr stated below), when the flink cluster 
is processing too fast and the throughput on the sink kinesis stream is 
limited, i.e, the throughput exceeding exception starts to be thrown, we quite 
often get the following exception (pasted in the end) very soon and all the 
subtasks switching status to cancelling and restarted.

From the exception trace, I can see that yarn got shutdown and all task 
managers are terminated. I suspect it is because of the memory issue. Whenever 
the throughput exceeding exception is thrown, it implicitly means that the 
internal unbounded queue in KPL may grow rapidly. we set the recordTtl = 60s 
and we can still see the record expiration exception along with exceeded 
throughput exception. Which leads me to wonder that if the internal unbounded 
queue grows too large and exhaust all the memory in the node and eventually 
crashing the yarn and the job manager.

 

Well, This is just my hypothesis. I wonder if someone has already encountered 
or investigated similar issues and could shed some light on it.

 

 

 

 

java.lang.Exception: TaskManager was lost/killed: 
container_1529095945616_0009_01_04 @ ip-172-31-64-249.ec2.internal 
(dataPort=44591)

    at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

    at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

    at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

    at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

    at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

    at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

    at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426)

    at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at 
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)

    at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

    at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

 

 

 

 

From: "Liu, Gavin (CAI - Atlanta)" 
Date: Wednesday, June 20, 2018 at 12:11 PM
To: "Tzu-Li (Gordon) Tai" , "user@flink.apache.org" 

Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

Thanks, Gordon. You are quick and It is very helpful to me.

I tried some other alternatives to 

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Till Rohrmann
Hi Garrett,

killing of idle TaskManager should not affect the execution of the job. By
definition a TaskManager only idles if it does not execute any tasks. Could
you maybe share the complete logs (of the cluster entrypoint and all
TaskManagers) with us?

Cheers,
Till

On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske  wrote:

> Hi Garrett,
>
> I agree, there seems to be an issue and increasing the timeout should not
> be the right approach to solve it.
> Are you running streaming or batch jobs, i.e., do some of the tasks finish
> much earlier than others?
>
> I'm adding Till to this thread who's very familiar with scheduling and
> process communication.
>
> Best, Fabian
>
> 2018-06-19 0:03 GMT+02:00 Garrett Barton :
>
>> Hey all,
>>
>>  My jobs that I am trying to write in Flink 1.5 are failing after a few
>> minutes.  I think its because the idle task managers are shutting down,
>> which seems to kill the client and the running job. The running job itself
>> was still going on one of the other task managers.  I get:
>>
>> org.apache.flink.client.program.ProgramInvocationException:
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager ''. This might
>> indicate that the remote task manager was lost.
>> at org.apache.flink.runtime.io
>> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
>>
>> Now I happen to have the last part of the flow paralleled to 1 right now
>> for debugging, so the 4 task managers that are spun up, 3 of them hit the
>> timeout period (currently set to 24).  I think as soon as the first one
>> goes the client throws up and the whole job dies as a result.
>>
>>  Is this expected behavior and if so, is there another way around it? Do
>> I keep increasing the slotmanager.taskmanager-timeout to a really really
>> large number? I have verified setting the timeout to 84 lets the job
>> complete without error.
>>
>> Thank you!
>>
>
>


Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Siew Wai Yow
Thanks @SihuaZhou, you are right that this is a bug. Just check the source code 
too. @Chesnay Schepler,  Tested with both checkpoint 
and savepoint at same File system and it is working as expected.


Thanks guys!


-Yow



From: Chesnay Schepler 
Sent: Thursday, June 21, 2018 3:36 PM
To: user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change

If you could open a JIRA this would be great.

On 21.06.2018 09:07, sihua zhou wrote:
Hi Yow,

I had a look at the related code, I think this seems like a bug. Flink use 
Checkpoint path's FileSystem to create the output stream for the Savepoint, but 
in your case the checkpoint & savepoint are not using the same file system. A 
workaround is to use the same file system for both checkpoint & savepoint.

Best, Sihua



On 06/21/2018 14:07,Siew Wai 
Yow wrote:

Thanks Chesnay, the application will take value from "state.savepoints.dir" as 
default if set target-directory to nul. But then it trying to create the 
directory in local machine, which caused the below error because it is a HDFS 
directory. The same URL works in previous Flink 1.3.2. Is something break in 
Flink 1.5.0? Or anything we need to extra configure? Thank you.


Caused by: java.io.IOException: Mkdirs failed to create 
hdfs://192.168.56.150:8020/flink/savepoints/savepoint-82-fb6eca4f3dbb
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
... 25 more



flink-conf.yaml:



state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir:  hdfs://192.168.56.150:8020/flink/savepoints



-Yow



From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Wednesday, June 20, 2018 3:15 PM
To: user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change

I think you can set the target-directory to null. But I'm not sure why this 
particular request requires this, other request allow optional fields to simply 
be ommitted...

On 20.06.2018 06:12, Siew Wai Yow wrote:

Hi all,


Seems pass in target-directory is a must now for checkpoints REST API, and the 
status will not response with save point directory anymore. I can pass in but 
the information is redundant with the same already defined in 
flink-config.yaml. May I know is there a way to retrieve the save point 
directory from flink-config.yaml in flink application? I am not able to get it 
from env.getConfig(). Thank you.



From: Chesnay Schepler 
Sent: Tuesday, June 19, 2018 11:55 PM
To: user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change

1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the "mode" query 
parameter

2. POST to /jobs/:jobid/savepoints, with a json payload. Returns a trigger id, 
used for 3).
{
"target-directory" : {

  "type" : "string"
},
"cancel-job" : {
  "type" : "boolean"
}
}


3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:
For #1, you need to use a PATCH request to "/jobs/:jobid"

On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:

Hi,


Regarding to Flink 1.5.0 REST API breaking change,

  *   The REST API to cancel a job was changed.
  *   The REST API to cancel a job with savepoint was changed.

I have few dump questions,


  1.  Any replacement for cancellation ONLY without save-point? Only found 
"/jobs/:jobid/savepoints".
  2.  For "/jobs/:jobid/savepoints", how could I form the URL with cancellation 
and with directory afterward?
 *   http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??
  3.  Any cancellation progress monitoring in 1.5 like previous version 1.3/1.4?
 *   previous version: 
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId

Thank you.

Regards,
Yow







Re: Flink 1.5 TooLongFrameException in cluster mode?

2018-06-21 Thread chrisr123
This looks related to using the -m option on CLI:

This works:
$FLINK_HOME/bin/flink run -c $CLASS $JARFILE  

but this causes the error:
$FLINK_HOME/bin/flink run -m jobmanagermachine:6123 -c $CLASS $JARFILE  


I found this thread here from April 27
http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3c1524831996624-0.p...@n4.nabble.com%3E



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: A question about Kryo and Window State

2018-06-21 Thread Tzu-Li (Gordon) Tai
Hi Vishal,

Kryo has a serializer called `CompatibleFieldSerializer` that allows for simple 
backward compatibility changes, such as adding non-optional fields / removing 
fields.

If using the KryoSerializer is a must, then a good thing to do is to register 
Kryo's `CompatibleFieldSerializer` as the serializer to be used for that 
specific type.
By default, Kryo doesn’t use the `CompatibleFieldSerializer`, so you would have 
to explicitly register this.

The issue is, I think it wouldn’t be possible to use the 
`CompatibleFieldSerializer` _after_ the bytes were already written with the 
default, non-compatible version (the `FieldSerializer`).
So, AFAIK, this should only work if your Kryo state type was registered with 
the `CompatibleFieldSerializer` from the very beginning.
There could be a workaround, but unfortunately that would require changing some 
code in the `KryoSerializer`.
If you require this because your job is already running in production and data 
was already written by the `FieldSerializer`, please let me know and I’ll go 
into more details about this.

Cheers,
Gordon

On 21 June 2018 at 10:14:15 AM, Fabian Hueske (fhue...@gmail.com) wrote:

Hi Vishal,

In general, Kryo serializers are not very upgrade friendly.
Serializer compatibility [1] might be right approach here, but Gordon (in CC) 
might know more about this.

Best, Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility

2018-06-18 12:06 GMT+02:00 Vishal Santoshi :
Any more insight?

On Wed, Jun 13, 2018, 3:34 PM Vishal Santoshi  wrote:
Any ideas on the standard way ( or any roundabout way ) of doing a version 
upgrade that looks back ward compatible. 
The  @FieldSerializer.Optional("0") actually does  ignore the field ( even if 
reset ) giving it the default value if kyro is used. It has to do with the 
FieldSerializer behaves  .  There is another Serializer ( Composite I believe ) 
that allows for such back ward compatible changes.


I know some work is being done in 1.6 to allow for above use case and I think 
Google Data Flow does provide some avenues.

Thanks much

Vishal



On Tue, Jun 12, 2018 at 11:30 PM, Vishal Santoshi  
wrote:
I have a running pipe with Window State in a class say

Class A{
     long a;
}

It uses the default KryoSerializer 

I want to add a field to 

Class A {
  long a;
  long b;
}

I need to suspend with SP and resume with the new version of Class A


Is there a definite way to do this. I tried

Class A {
  long a;
   @FieldSerializer.Optional("0")
  long b;
}

but that seems to default to 0 , even when the Aggregation is putting in values.

Could somebody give pointers as to how to solve this 

Thanks a ton.







Re: Debug job execution from savepoint

2018-06-21 Thread Fabian Hueske
Hi Manuel,

I had a look and couldn't find a way to do it.
However, this sounds like a very useful feature to me.

Would you mind creating a Jira issue [1] for that?

Thanks, Fabian

[1] https://issues.apache.org/jira/projects/FLINK

2018-06-18 16:23 GMT+02:00 Haddadi Manuel :

> Hi all,
>
>
> I would like to test my checkpointing implementation doing a step-by-step
> debugging under an IDE.
>
>
> Is there a way to restore a job from a local savepoint in a local stream
> environnement, like a command "flink run -s :savepontpath" would do ?
>
>
> Thanks,
>
>
> Manuel
>


Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Fabian Hueske
Hi Garrett,

I agree, there seems to be an issue and increasing the timeout should not
be the right approach to solve it.
Are you running streaming or batch jobs, i.e., do some of the tasks finish
much earlier than others?

I'm adding Till to this thread who's very familiar with scheduling and
process communication.

Best, Fabian

2018-06-19 0:03 GMT+02:00 Garrett Barton :

> Hey all,
>
>  My jobs that I am trying to write in Flink 1.5 are failing after a few
> minutes.  I think its because the idle task managers are shutting down,
> which seems to kill the client and the running job. The running job itself
> was still going on one of the other task managers.  I get:
>
> org.apache.flink.client.program.ProgramInvocationException:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager ''. This might
> indicate that the remote task manager was lost.
> at org.apache.flink.runtime.io.network.netty.
> CreditBasedPartitionRequestClientHandler.channelInactive(
> CreditBasedPartitionRequestClientHandler.java:143)
>
> Now I happen to have the last part of the flow paralleled to 1 right now
> for debugging, so the 4 task managers that are spun up, 3 of them hit the
> timeout period (currently set to 24).  I think as soon as the first one
> goes the client throws up and the whole job dies as a result.
>
>  Is this expected behavior and if so, is there another way around it? Do I
> keep increasing the slotmanager.taskmanager-timeout to a really really
> large number? I have verified setting the timeout to 84 lets the job
> complete without error.
>
> Thank you!
>


Re: Control insert database with dataset

2018-06-21 Thread Fabian Hueske
Hi Dulce,

This functionality is not supported by the JDBCOutputFormat.
Some database systems (AFAIK, MySQL) support Upsert writes, i.e., writes
that insert if the primary key is not present or update the row if the PK
exists. Not sure if that would meet your requirements.

If you don't want to go with Upserts or if your DBMS doesn't support them
in a single statement, you'd have to implement your own output format.
You can of course use the JDBCOutputFormat as a starting point and adjust
it to your needs.

Best, Fabian

2018-06-18 23:38 GMT+02:00 Dulce Morim :

> Hello,
>
> I'm trying catch a BatchUpdateException when insert DataSet using a method
> output. Because, I need control if insert a duplicate key. How I can do
> this?
>
>
>
> [2018-06-18 22:18:56,419] INFO DataSink (org.apache.flink.api.java.io.
> jdbc.JDBCOutputFormat@64aad6db) (1/1) (00a77c9e18f893cde9c62a3c9ca5c471)
> switched from RUNNING to FAILED. (org.apache.flink.runtime.
> executiongraph.ExecutionGraph)
> java.lang.IllegalArgumentException: writeRecord() failed
> at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.
> writeRecord(JDBCOutputFormat.java:209)
> at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.
> writeRecord(JDBCOutputFormat.java:41)
> at org.apache.flink.runtime.operators.DataSinkTask.invoke(
> DataSinkTask.java:194)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.sql.BatchUpdateException: Violation of PRIMARY KEY
> constraint 'TEST_PK'. Cannot insert duplicate key in object 'TEST'. The
> duplicate key value is (37183).
> at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.
> executeBatch(SQLServerPreparedStatement.java:2303)
> at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.
> writeRecord(JDBCOutputFormat.java:205)
> ... 4 more
>
>
> Only have a generic exception:
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>
>
> Thanks,
> Dulce Morim
>


Re: A question about Kryo and Window State

2018-06-21 Thread Fabian Hueske
Hi Vishal,

In general, Kryo serializers are not very upgrade friendly.
Serializer compatibility [1] might be right approach here, but Gordon (in
CC) might know more about this.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility

2018-06-18 12:06 GMT+02:00 Vishal Santoshi :

> Any more insight?
>
> On Wed, Jun 13, 2018, 3:34 PM Vishal Santoshi 
> wrote:
>
>> Any ideas on the standard way ( or any roundabout way ) of doing a
>> version upgrade that looks back ward compatible.
>> The  @FieldSerializer.Optional("0") actually does  ignore the field (
>> even if reset ) giving it the default value if kyro is used. It has to do
>> with the FieldSerializer behaves  .  There is another Serializer (
>> Composite I believe ) that allows for such back ward compatible changes.
>>
>>
>> I know some work is being done in 1.6 to allow for above use case and I
>> think Google Data Flow does provide some avenues.
>>
>> Thanks much
>>
>> Vishal
>>
>>
>>
>> On Tue, Jun 12, 2018 at 11:30 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I have a running pipe with Window State in a class say
>>>
>>> Class A{
>>>  long a;
>>> }
>>>
>>> It uses the default KryoSerializer
>>>
>>> I want to add a field to
>>>
>>> Class A {
>>>   long a;
>>>   long b;
>>> }
>>>
>>> I need to suspend with SP and resume with the new version of Class A
>>>
>>>
>>> Is there a definite way to do this. I tried
>>>
>>> Class A {
>>>   long a;
>>>@FieldSerializer.Optional("0")
>>>   long b;
>>> }
>>>
>>> but that seems to default to 0 , even when the Aggregation is putting in
>>> values.
>>>
>>> Could somebody give pointers as to how to solve this
>>>
>>> Thanks a ton.
>>>
>>>
>>>
>>>
>>


Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
Thank you for the clarification.

On Thu, Jun 21, 2018 at 1:36 PM sihua zhou  wrote:

> Yes, you can clear the state for a key(the currently active key), if you
> clear it, it means that you have also cleaned it from the state backend,
> and the future checpoints won't contains the key anymore unless you add it
> again.
>
> Best, Sihua
>
>
> On 06/21/2018 16:04,Garvit Sharma
>  wrote:
>
> Now, after clearing state for a key, I don't want that redundant data in
> the state backend. This is my concern.
>
> Please let me know if there are any gaps.
>
> Thanks,
>
> On Thu, Jun 21, 2018 at 1:31 PM Garvit Sharma  wrote:
>
>> I am maintaining state data for a key in ValueState. As per [0] I can
>> clear() state for that key.
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html
>>
>> Please let me know.
>>
>> Thanks,
>>
>>
>> On Thu, Jun 21, 2018 at 1:19 PM sihua zhou  wrote:
>>
>>> Hi Garvit,
>>>
>>> Let's say you clearing the state at timestamp t1, then the checkpoints
>>> completed before t1 will still contains the data you cleared. But the
>>> future checkpoints won't contain the cleared data again. But I'm not sure
>>> what you meaning by the cleared state, you can only clear a key-value pair
>>> of the state currently, you can't cleared the whole state currently.
>>>
>>> Best, Sihua
>>>
>>> On 06/21/2018 15:41,Garvit Sharma
>>>  wrote:
>>>
>>> So, would it delete all the files in HDFS associated with the cleared
>>> state?
>>>
>>> On Thu, Jun 21, 2018 at 12:58 PM sihua zhou  wrote:
>>>
 Hi Garvit,

 > Now, let's say, we clear the state. Would the state data be removed
 from HDFS too?

 The state data would not be removed from HDFS immediately, if you clear
 the state in your job. But after you clearing the state in your job, the
 later completed checkpoint won't contain the state any more.

 > How does Flink manage to clear the state data from state backend on
 clearing the keyed state?

 1. you can use the {{tate.checkpoints.num-retained}} to set the number
 of the completed checkpoint maintanced on HDFS.
 2. If you set {{
 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
 DELETE_ON_CANCELLATION)}} then the checkpoints on HDFS will be removed
 once your job is finished(or cancled). And if you set {{
 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
  RETAIN_ON_CANCELLATION)}} then the checkpoints will be remained.

 Please refer to
 https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
  to
 find more information.


 Additional, I'd like to give a bref info of the checkpoint on HDFS. In
 a nutshell, what ever you did with the state in your running job, they only
 effect the content on the state backend locally. When checkpointing, flink
 takes a snapshot of the local state backend, and send it to the checkpoint
 target directory(in your case, the HDFS). The checkpoints on the HDFS looks
 like the periodic snapshot of the state backend of your job, they can be
 created or deleted but never be changed. Maybe Stefan(cc) could give you
 more professional information and plz correct me if I'm incorrect.

 Best, Sihua
 On 06/21/2018 14:40,Garvit Sharma
  wrote:

 Hi,

 Consider a managed keyed state backed by HDFS with checkpointing
 enabled. Now, as the state grows the state data will be saved on HDFS.

 Now, let's say, we clear the state. Would the state data be removed
 from HDFS too?

 How does Flink manage to clear the state data from state backend on
 clearing the keyed state?

 --

 Garvit Sharma
 github.com/garvitlnmiit/

 No Body is a Scholar by birth, its only hard work and strong
 determination that makes him master.


>>>
>>> --
>>>
>>> Garvit Sharma
>>> github.com/garvitlnmiit/
>>>
>>> No Body is a Scholar by birth, its only hard work and strong
>>> determination that makes him master.
>>>
>>>
>>
>> --
>>
>> Garvit Sharma
>> github.com/garvitlnmiit/
>>
>> No Body is a Scholar by birth, its only hard work and strong
>> determination that makes him master.
>>
>
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.
>
>

-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Yes, you can clear the state for a key(the currently active key), if you clear 
it, it means that you have also cleaned it from the state backend, and the 
future checpoints won't contains the key anymore unless you add it again.


Best, Sihua




On 06/21/2018 16:04,Garvit Sharma wrote:
Now, after clearing state for a key, I don't want that redundant data in the 
state backend. This is my concern.


Please let me know if there are any gaps.


Thanks,


On Thu, Jun 21, 2018 at 1:31 PM Garvit Sharma  wrote:

I am maintaining state data for a key in ValueState. As per [0] I can clear() 
state for that key.


[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html


Please let me know.


Thanks,




On Thu, Jun 21, 2018 at 1:19 PM sihua zhou  wrote:

Hi Garvit,


Let's say you clearing the state at timestamp t1, then the checkpoints 
completed before t1 will still contains the data you cleared. But the future 
checkpoints won't contain the cleared data again. But I'm not sure what you 
meaning by the cleared state, you can only clear a key-value pair of the state 
currently, you can't cleared the whole state currently.


Best, Sihua


On 06/21/2018 15:41,Garvit Sharma wrote:
So, would it delete all the files in HDFS associated with the cleared state?


On Thu, Jun 21, 2018 at 12:58 PM sihua zhou  wrote:

Hi Garvit,


> Now, let's say, we clear the state. Would the state data be removed from HDFS 
> too?


The state data would not be removed from HDFS immediately, if you clear the 
state in your job. But after you clearing the state in your job, the later 
completed checkpoint won't contain the state any more.


> How does Flink manage to clear the state data from state backend on clearing 
> the keyed state?


1. you can use the {{tate.checkpoints.num-retained}} to set the number of the 
completed checkpoint maintanced on HDFS.
2. If you set 
{{env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)}}
 then the checkpoints on HDFS will be removed once your job is finished(or 
cancled). And if you set 
{{env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
 RETAIN_ON_CANCELLATION)}} then the checkpoints will be remained.


Please refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
 to find more information.




Additional, I'd like to give a bref info of the checkpoint on HDFS. In a 
nutshell, what ever you did with the state in your running job, they only 
effect the content on the state backend locally. When checkpointing, flink 
takes a snapshot of the local state backend, and send it to the checkpoint 
target directory(in your case, the HDFS). The checkpoints on the HDFS looks 
like the periodic snapshot of the state backend of your job, they can be 
created or deleted but never be changed. Maybe Stefan(cc) could give you more 
professional information and plz correct me if I'm incorrect.


Best, Sihua
On 06/21/2018 14:40,Garvit Sharma wrote:
Hi,


Consider a managed keyed state backed by HDFS with checkpointing enabled. Now, 
as the state grows the state data will be saved on HDFS.


Now, let's say, we clear the state. Would the state data be removed from HDFS 
too?


How does Flink manage to clear the state data from state backend on clearing 
the keyed state?


--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.





--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.





--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.





--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.


Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
Now, after clearing state for a key, I don't want that redundant data in
the state backend. This is my concern.

Please let me know if there are any gaps.

Thanks,

On Thu, Jun 21, 2018 at 1:31 PM Garvit Sharma  wrote:

> I am maintaining state data for a key in ValueState. As per [0] I can
> clear() state for that key.
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html
>
> Please let me know.
>
> Thanks,
>
>
> On Thu, Jun 21, 2018 at 1:19 PM sihua zhou  wrote:
>
>> Hi Garvit,
>>
>> Let's say you clearing the state at timestamp t1, then the checkpoints
>> completed before t1 will still contains the data you cleared. But the
>> future checkpoints won't contain the cleared data again. But I'm not sure
>> what you meaning by the cleared state, you can only clear a key-value pair
>> of the state currently, you can't cleared the whole state currently.
>>
>> Best, Sihua
>>
>> On 06/21/2018 15:41,Garvit Sharma
>>  wrote:
>>
>> So, would it delete all the files in HDFS associated with the cleared
>> state?
>>
>> On Thu, Jun 21, 2018 at 12:58 PM sihua zhou  wrote:
>>
>>> Hi Garvit,
>>>
>>> > Now, let's say, we clear the state. Would the state data be removed
>>> from HDFS too?
>>>
>>> The state data would not be removed from HDFS immediately, if you clear
>>> the state in your job. But after you clearing the state in your job, the
>>> later completed checkpoint won't contain the state any more.
>>>
>>> > How does Flink manage to clear the state data from state backend on
>>> clearing the keyed state?
>>>
>>> 1. you can use the {{tate.checkpoints.num-retained}} to set the number
>>> of the completed checkpoint maintanced on HDFS.
>>> 2. If you set {{
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
>>> DELETE_ON_CANCELLATION)}} then the checkpoints on HDFS will be removed
>>> once your job is finished(or cancled). And if you set {{
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
>>>  RETAIN_ON_CANCELLATION)}} then the checkpoints will be remained.
>>>
>>> Please refer to
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
>>>  to
>>> find more information.
>>>
>>>
>>> Additional, I'd like to give a bref info of the checkpoint on HDFS. In a
>>> nutshell, what ever you did with the state in your running job, they only
>>> effect the content on the state backend locally. When checkpointing, flink
>>> takes a snapshot of the local state backend, and send it to the checkpoint
>>> target directory(in your case, the HDFS). The checkpoints on the HDFS looks
>>> like the periodic snapshot of the state backend of your job, they can be
>>> created or deleted but never be changed. Maybe Stefan(cc) could give you
>>> more professional information and plz correct me if I'm incorrect.
>>>
>>> Best, Sihua
>>> On 06/21/2018 14:40,Garvit Sharma
>>>  wrote:
>>>
>>> Hi,
>>>
>>> Consider a managed keyed state backed by HDFS with checkpointing
>>> enabled. Now, as the state grows the state data will be saved on HDFS.
>>>
>>> Now, let's say, we clear the state. Would the state data be removed from
>>> HDFS too?
>>>
>>> How does Flink manage to clear the state data from state backend on
>>> clearing the keyed state?
>>>
>>> --
>>>
>>> Garvit Sharma
>>> github.com/garvitlnmiit/
>>>
>>> No Body is a Scholar by birth, its only hard work and strong
>>> determination that makes him master.
>>>
>>>
>>
>> --
>>
>> Garvit Sharma
>> github.com/garvitlnmiit/
>>
>> No Body is a Scholar by birth, its only hard work and strong
>> determination that makes him master.
>>
>>
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.
>


-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
I am maintaining state data for a key in ValueState. As per [0] I can
clear() state for that key.

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html

Please let me know.

Thanks,


On Thu, Jun 21, 2018 at 1:19 PM sihua zhou  wrote:

> Hi Garvit,
>
> Let's say you clearing the state at timestamp t1, then the checkpoints
> completed before t1 will still contains the data you cleared. But the
> future checkpoints won't contain the cleared data again. But I'm not sure
> what you meaning by the cleared state, you can only clear a key-value pair
> of the state currently, you can't cleared the whole state currently.
>
> Best, Sihua
>
> On 06/21/2018 15:41,Garvit Sharma
>  wrote:
>
> So, would it delete all the files in HDFS associated with the cleared
> state?
>
> On Thu, Jun 21, 2018 at 12:58 PM sihua zhou  wrote:
>
>> Hi Garvit,
>>
>> > Now, let's say, we clear the state. Would the state data be removed
>> from HDFS too?
>>
>> The state data would not be removed from HDFS immediately, if you clear
>> the state in your job. But after you clearing the state in your job, the
>> later completed checkpoint won't contain the state any more.
>>
>> > How does Flink manage to clear the state data from state backend on
>> clearing the keyed state?
>>
>> 1. you can use the {{tate.checkpoints.num-retained}} to set the number
>> of the completed checkpoint maintanced on HDFS.
>> 2. If you set {{
>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
>> DELETE_ON_CANCELLATION)}} then the checkpoints on HDFS will be removed
>> once your job is finished(or cancled). And if you set {{
>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
>>  RETAIN_ON_CANCELLATION)}} then the checkpoints will be remained.
>>
>> Please refer to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
>>  to
>> find more information.
>>
>>
>> Additional, I'd like to give a bref info of the checkpoint on HDFS. In a
>> nutshell, what ever you did with the state in your running job, they only
>> effect the content on the state backend locally. When checkpointing, flink
>> takes a snapshot of the local state backend, and send it to the checkpoint
>> target directory(in your case, the HDFS). The checkpoints on the HDFS looks
>> like the periodic snapshot of the state backend of your job, they can be
>> created or deleted but never be changed. Maybe Stefan(cc) could give you
>> more professional information and plz correct me if I'm incorrect.
>>
>> Best, Sihua
>> On 06/21/2018 14:40,Garvit Sharma
>>  wrote:
>>
>> Hi,
>>
>> Consider a managed keyed state backed by HDFS with checkpointing enabled.
>> Now, as the state grows the state data will be saved on HDFS.
>>
>> Now, let's say, we clear the state. Would the state data be removed from
>> HDFS too?
>>
>> How does Flink manage to clear the state data from state backend on
>> clearing the keyed state?
>>
>> --
>>
>> Garvit Sharma
>> github.com/garvitlnmiit/
>>
>> No Body is a Scholar by birth, its only hard work and strong
>> determination that makes him master.
>>
>>
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.
>
>

-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Hi Garvit,


Let's say you clearing the state at timestamp t1, then the checkpoints 
completed before t1 will still contains the data you cleared. But the future 
checkpoints won't contain the cleared data again. But I'm not sure what you 
meaning by the cleared state, you can only clear a key-value pair of the state 
currently, you can't cleared the whole state currently.


Best, Sihua


On 06/21/2018 15:41,Garvit Sharma wrote:
So, would it delete all the files in HDFS associated with the cleared state?


On Thu, Jun 21, 2018 at 12:58 PM sihua zhou  wrote:

Hi Garvit,


> Now, let's say, we clear the state. Would the state data be removed from HDFS 
> too?


The state data would not be removed from HDFS immediately, if you clear the 
state in your job. But after you clearing the state in your job, the later 
completed checkpoint won't contain the state any more.


> How does Flink manage to clear the state data from state backend on clearing 
> the keyed state?


1. you can use the {{tate.checkpoints.num-retained}} to set the number of the 
completed checkpoint maintanced on HDFS.
2. If you set 
{{env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)}}
 then the checkpoints on HDFS will be removed once your job is finished(or 
cancled). And if you set 
{{env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
 RETAIN_ON_CANCELLATION)}} then the checkpoints will be remained.


Please refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
 to find more information.




Additional, I'd like to give a bref info of the checkpoint on HDFS. In a 
nutshell, what ever you did with the state in your running job, they only 
effect the content on the state backend locally. When checkpointing, flink 
takes a snapshot of the local state backend, and send it to the checkpoint 
target directory(in your case, the HDFS). The checkpoints on the HDFS looks 
like the periodic snapshot of the state backend of your job, they can be 
created or deleted but never be changed. Maybe Stefan(cc) could give you more 
professional information and plz correct me if I'm incorrect.


Best, Sihua
On 06/21/2018 14:40,Garvit Sharma wrote:
Hi,


Consider a managed keyed state backed by HDFS with checkpointing enabled. Now, 
as the state grows the state data will be saved on HDFS.


Now, let's say, we clear the state. Would the state data be removed from HDFS 
too?


How does Flink manage to clear the state data from state backend on clearing 
the keyed state?


--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.





--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.


Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
So, would it delete all the files in HDFS associated with the cleared state?

On Thu, Jun 21, 2018 at 12:58 PM sihua zhou  wrote:

> Hi Garvit,
>
> > Now, let's say, we clear the state. Would the state data be removed from
> HDFS too?
>
> The state data would not be removed from HDFS immediately, if you clear
> the state in your job. But after you clearing the state in your job, the
> later completed checkpoint won't contain the state any more.
>
> > How does Flink manage to clear the state data from state backend on
> clearing the keyed state?
>
> 1. you can use the {{tate.checkpoints.num-retained}} to set the number of
> the completed checkpoint maintanced on HDFS.
> 2. If you set {{
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
> DELETE_ON_CANCELLATION)}} then the checkpoints on HDFS will be removed
> once your job is finished(or cancled). And if you set {{
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
>  RETAIN_ON_CANCELLATION)}} then the checkpoints will be remained.
>
> Please refer to
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
>  to
> find more information.
>
>
> Additional, I'd like to give a bref info of the checkpoint on HDFS. In a
> nutshell, what ever you did with the state in your running job, they only
> effect the content on the state backend locally. When checkpointing, flink
> takes a snapshot of the local state backend, and send it to the checkpoint
> target directory(in your case, the HDFS). The checkpoints on the HDFS looks
> like the periodic snapshot of the state backend of your job, they can be
> created or deleted but never be changed. Maybe Stefan(cc) could give you
> more professional information and plz correct me if I'm incorrect.
>
> Best, Sihua
> On 06/21/2018 14:40,Garvit Sharma
>  wrote:
>
> Hi,
>
> Consider a managed keyed state backed by HDFS with checkpointing enabled.
> Now, as the state grows the state data will be saved on HDFS.
>
> Now, let's say, we clear the state. Would the state data be removed from
> HDFS too?
>
> How does Flink manage to clear the state data from state backend on
> clearing the keyed state?
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.
>
>

-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Chesnay Schepler

If you could open a JIRA this would be great.

On 21.06.2018 09:07, sihua zhou wrote:

Hi Yow,

I had a look at the related code, I think this seems like a bug. Flink 
use Checkpoint path's FileSystem to create the output stream for the 
Savepoint, but in your case the checkpoint & savepoint are not using 
the same file system. A workaround is to use the same file system for 
both checkpoint & savepoint.


Best, Sihua



On 06/21/2018 14:07,Siew Wai Yow 
 wrote:


Thanks Chesnay, the application will take value from
"state.savepoints.dir" as default if set target-directory to nul.
But then it trying to create the directory in local machine, which
caused the below error because it is a HDFS directory. The same
URL works in previous Flink 1.3.2. Is something break in Flink
1.5.0? Or anything we need to extra configure? Thank you.


Caused by: java.io.IOException: *Mkdirs failed to create*
hdfs://192.168.56.150:8020/flink/savepoints/savepoint-82-fb6eca4f3dbb
at

org.apache.flink.core.fs.local.*LocalFileSystem*.create(LocalFileSystem.java:271)
at

org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at

org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
... 25 more



flink-conf.yaml:



state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir: hdfs://192.168.56.150:8020/flink/savepoints


-Yow




*From:* Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Wednesday, June 20, 2018 3:15 PM
*To:* user@flink.apache.org 
*Subject:* Re: Questions regarding to Flink 1.5.0 REST API change
I think you can set the target-directory to null. But I'm not sure
why this particular request requires this, other request allow
optional fields to simply be ommitted...

On 20.06.2018 06:12, Siew Wai Yow wrote:


Hi all,


Seems pass in target-directory is a must now for checkpoints REST
API, and the status will not response with save point directory
anymore. I can pass in but the information is redundant with the
same already defined in flink-config.yaml. May I know is there a
way to retrieve the save point directory from flink-config.yaml
in flink application? I am not able to get it from
env.getConfig(). Thank you.




*From:* Chesnay Schepler 

*Sent:* Tuesday, June 19, 2018 11:55 PM
*To:* user@flink.apache.org 
*Subject:* Re: Questions regarding to Flink 1.5.0 REST API change
1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the
"mode" query parameter

2. POST to /jobs/:jobid/savepoints, with a json payload. Returns
a trigger id, used for 3).
{|
"target-directory" : { |
|"type" : "string" }, "cancel-job" : { "type" : "boolean" } } |
3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:

For #1, you need to use a PATCH request to "*/jobs/:jobid*"

On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow
mailto:wai_...@hotmail.com>> wrote:

Hi,


Regarding to Flink 1.5.0 REST API breaking change,

  * /The REST API to cancel a job was changed./
  * /The REST API to cancel a job with savepoint was changed./

I have few dump questions,


 1. Any replacement for cancellation ONLY without
save-point? Only found "*/jobs/:jobid/savepoints*".
 2. For "*/jobs/:jobid/savepoints*", how could I form the
URL with cancellation and with directory afterward?
 1. 
http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??

 3. Any cancellation progress monitoring in 1.5 like
previous version 1.3/1.4?
 1. previous version:
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId



Thank you.

Regards,
Yow










Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
I just create a JIRA for this: https://issues.apache.org/jira/browse/FLINK-9633




On 06/21/2018 15:10,Chesnay Schepler wrote:
That's quite weird that it tries to us the local file-system. Maybe it derives 
the FS from state.backend.fs.checckpointdir, but uses it for savepoints.dir?

What happens if you set state.backend.fs.checkpointdir also to HDFS?


On 21.06.2018 08:07, Siew Wai Yow wrote:


Thanks Chesnay, the application will take value from "state.savepoints.dir" as 
default if set target-directory to nul. But then it trying to create the 
directory in local machine, which caused the below error because it is a HDFS 
directory. The same URL works in previous Flink 1.3.2. Is something break in 
Flink 1.5.0? Or anything we need to extra configure? Thank you.




Caused by: java.io.IOException: Mkdirs failed to create 
hdfs://192.168.56.150:8020/flink/savepoints/savepoint-82-fb6eca4f3dbb
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
... 25 more








flink-conf.yaml:




state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir:  hdfs://192.168.56.150:8020/flink/savepoints





-Yow





From: Chesnay Schepler 
Sent: Wednesday, June 20, 2018 3:15 PM
To:user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change
 
I think you can set the target-directory to null. But I'm not sure why this 
particular request requires this, other request allow optional fields to simply 
be ommitted...

On 20.06.2018 06:12, Siew Wai Yow wrote:


Hi all,




Seems pass in target-directory is a must now for checkpoints REST API, and the 
status will not response with save point directory anymore. I can pass in but 
the information is redundant with the same already defined in 
flink-config.yaml. May I know is there a way to retrieve the save point 
directory from flink-config.yaml in flink application? I am not able to get it 
from env.getConfig(). Thank you.





From: Chesnay Schepler 
Sent: Tuesday, June 19, 2018 11:55 PM
To: user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change
 
1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the "mode" query 
parameter

2. POST to /jobs/:jobid/savepoints, with a json payload. Returns a trigger id, 
used for 3).
{
"target-directory" : {
  "type" : "string"
},
"cancel-job" : {
  "type" : "boolean"
}
}

3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:

For #1, you need to use a PATCH request to "/jobs/:jobid"


On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow  wrote:


Hi,




Regarding to Flink 1.5.0 REST API breaking change,

The REST API to cancel a job was changed.
The REST API to cancel a job with savepoint was changed.

I have few dump questions,




Any replacement for cancellation ONLY without save-point? Only found 
"/jobs/:jobid/savepoints".
For "/jobs/:jobid/savepoints", how could I form the URL with cancellation and 
with directory afterward?
http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??
Any cancellation progress monitoring in 1.5 like previous version 1.3/1.4?
previous version: /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId


Thank you.


Regards,
Yow














Re:Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Hi Garvit,


> Now, let's say, we clear the state. Would the state data be removed from HDFS 
> too?


The state data would not be removed from HDFS immediately, if you clear the 
state in your job. But after you clearing the state in your job, the later 
completed checkpoint won't contain the state any more.


> How does Flink manage to clear the state data from state backend on clearing 
> the keyed state?


1. you can use the {{tate.checkpoints.num-retained}} to set the number of the 
completed checkpoint maintanced on HDFS.
2. If you set 
{{env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)}}
 then the checkpoints on HDFS will be removed once your job is finished(or 
cancled). And if you set 
{{env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.
 RETAIN_ON_CANCELLATION)}} then the checkpoints will be remained.


Please refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
 to find more information.




Additional, I'd like to give a bref info of the checkpoint on HDFS. In a 
nutshell, what ever you did with the state in your running job, they only 
effect the content on the state backend locally. When checkpointing, flink 
takes a snapshot of the local state backend, and send it to the checkpoint 
target directory(in your case, the HDFS). The checkpoints on the HDFS looks 
like the periodic snapshot of the state backend of your job, they can be 
created or deleted but never be changed. Maybe Stefan(cc) could give you more 
professional information and plz correct me if I'm incorrect.


Best, Sihua
On 06/21/2018 14:40,Garvit Sharma wrote:
Hi,


Consider a managed keyed state backed by HDFS with checkpointing enabled. Now, 
as the state grows the state data will be saved on HDFS.


Now, let's say, we clear the state. Would the state data be removed from HDFS 
too?


How does Flink manage to clear the state data from state backend on clearing 
the keyed state?


--


Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.


Re: Strictly use TLSv1.2

2018-06-21 Thread Vinay Patil
Hi,

I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug
logs it shows that Flink is using TLSv1.2. However based on the security
scans we have observed that it also allows TLSv1.0 and TLSv1.1.

In order to strictly use TLSv1.2 we have updated the following property of
java.security file:
jdk.tls.disabledAlgorithms=MD5, SSLv3, DSA, RSA keySize < 2048, TLSv1,
TLSv1.1

But still it allows TLSv1.1 , verified this by hitting the following command

from master node:

openssl s_client -connect taskmanager1: -tls1

(here listening_address_port is part of
akka.ssl.tcp://flink@taskmanager1:port/user/taskmanager)

Now, when I hit the above command for the data port, it does not allow
TLSv1.1 and only allows TLSv1.2

Can you please let me know how can I enforce all the flink ports to use
TLSv1.2.

Regards,
Vinay Patil


Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
Hi Yow,


I had a look at the related code, I think this seems like a bug. Flink use 
Checkpoint path's FileSystem to create the output stream for the Savepoint, but 
in your case the checkpoint & savepoint are not using the same file system. A 
workaround is to use the same file system for both checkpoint & savepoint.


Best, Sihua






On 06/21/2018 14:07,Siew Wai Yow wrote:

Thanks Chesnay, the application will take value from "state.savepoints.dir" as 
default if set target-directory to nul. But then it trying to create the 
directory in local machine, which caused the below error because it is a HDFS 
directory. The same URL works in previous Flink 1.3.2. Is something break in 
Flink 1.5.0? Or anything we need to extra configure? Thank you.




Caused by: java.io.IOException: Mkdirs failed to create 
hdfs://192.168.56.150:8020/flink/savepoints/savepoint-82-fb6eca4f3dbb
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
... 25 more








flink-conf.yaml:




state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir:  hdfs://192.168.56.150:8020/flink/savepoints





-Yow





From: Chesnay Schepler 
Sent: Wednesday, June 20, 2018 3:15 PM
To:user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change
 
I think you can set the target-directory to null. But I'm not sure why this 
particular request requires this, other request allow optional fields to simply 
be ommitted...

On 20.06.2018 06:12, Siew Wai Yow wrote:


Hi all,




Seems pass in target-directory is a must now for checkpoints REST API, and the 
status will not response with save point directory anymore. I can pass in but 
the information is redundant with the same already defined in 
flink-config.yaml. May I know is there a way to retrieve the save point 
directory from flink-config.yaml in flink application? I am not able to get it 
from env.getConfig(). Thank you.





From: Chesnay Schepler 
Sent: Tuesday, June 19, 2018 11:55 PM
To: user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change
 
1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the "mode" query 
parameter

2. POST to /jobs/:jobid/savepoints, with a json payload. Returns a trigger id, 
used for 3).
{
"target-directory" : {
  "type" : "string"
},
"cancel-job" : {
  "type" : "boolean"
}
}

3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:

For #1, you need to use a PATCH request to "/jobs/:jobid"


On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow  wrote:


Hi,




Regarding to Flink 1.5.0 REST API breaking change,

The REST API to cancel a job was changed.
The REST API to cancel a job with savepoint was changed.

I have few dump questions,




Any replacement for cancellation ONLY without save-point? Only found 
"/jobs/:jobid/savepoints".
For "/jobs/:jobid/savepoints", how could I form the URL with cancellation and 
with directory afterward?
http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??
Any cancellation progress monitoring in 1.5 like previous version 1.3/1.4?
previous version: /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId


Thank you.


Regards,
Yow











Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
Hi,

if the rest.address is different from the jobmanager.rpc.address, then you
should specify that in the flink-conf.yaml and Flink will connect to
rest.address. Only if rest.address is not specified, the system will fall
back to use the jobmanager.rpc.address. Currently, the rest server endpoint
runs in the same JVM as the cluster entrypoint and all JobMasters.

Cheers,
Till

On Thu, Jun 21, 2018 at 8:46 AM Sampath Bhat 
wrote:

> Hello Till
>
> Thanks for clarification. But I've few questions based on your reply.
>
> In non-HA setups we need the jobmanager.rpc.address to derive the hostname
> of the rest server.
> why is there dependency on jobmanager.rpc.address to get the hostname rest
> server? This holds good only for normal deployments such as on bare metal,
> virtual machine where flink cluster runs as another process in a machine.
> But if we try deploy flink on kubernetes then there could be possiblity
> where jobmanager.rpc.address and rest.address different from each other.
>
> So if rest.address is not provided in flink-conf.yaml then looking for
> jobmanager.rpc.address for deriving the hostname of rest server makes
> sense, but when the user has already provided the rest.address but flink
> still looks into jobmanager.rpc.address for getting hostname of rest server
> is an unwanted dependency IMO.
>
> In HA setup the rpc.address is obtained from zookeeper so user need not
> worry about unnecessary properties while submitting job.
>
> On Wed, Jun 20, 2018 at 1:25 PM, Till Rohrmann 
> wrote:
>
> > It will, but it defaults to jobmanager.rpc.address if no rest.address has
> > been specified.
> >
> > On Wed, Jun 20, 2018 at 9:49 AM Chesnay Schepler 
> > wrote:
> >
> >> Shouldn't the non-HA case be covered by rest.address?
> >>
> >> On 20.06.2018 09:40, Till Rohrmann wrote:
> >>
> >> Hi Sampath,
> >>
> >> it is no longer possible to not start the rest server endpoint by
> setting
> >> rest.port to -1. If you do this, then the cluster won't start. The
> comment
> >> in the flink-conf.yaml holds only true for the legacy mode.
> >>
> >> In non-HA setups we need the jobmanager.rpc.address to derive the
> >> hostname of the rest server. The jobmanager.rpc.port is no longer needed
> >> for the client but only for the other cluster components (TMs). When
> using
> >> the HA mode, then every address will be retrieved from ZooKeeper.
> >>
> >> I hope this clarifies things.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Jun 20, 2018 at 9:24 AM Chesnay Schepler 
> >> wrote:
> >>
> >>> I was worried this might be the case.
> >>>
> >>> The rest.port handling was simply copied from the legacy web-server,
> >>> which explicitly allowed shutting it down.
> >>> It may (I'm not entirely sure) also not be necessary for all deployment
> >>> modes; for example if the job is baked into the job/taskmanager images.
> >>>
> >>> I'm not quite sure whether the rpc address is actually required for the
> >>> REST job submission, or only since we still rely partly on some legacy
> >>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
> >>>
> >>> > Adding on to this point you made - " the rpc address is still
> >>> *required *due
> >>> > to some technical implementations; it may be that you can set this to
> >>> some
> >>> > arbitrary value however."
> >>> >
> >>> > For job submission to happen successfully we should give specific rpc
> >>> > address and not any arbitrary value. If any arbitrary value is given
> >>> the
> >>> > job submission fails with the following error -
> >>> > org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't
> >>> > retrieve standalone cluster
> >>> >  at
> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
> >>> retrieve(StandaloneClusterDescriptor.java:51)
> >>> >  at
> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
> >>> retrieve(StandaloneClusterDescriptor.java:31)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.runProgram(
> >>> CliFrontend.java:249)
> >>> >  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.
> >>> java:210)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
> >>> CliFrontend.java:1020)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
> >>> CliFrontend.java:1096)
> >>> >  at
> >>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
> >>> NoOpSecurityContext.java:30)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> >>> > Caused by: java.net.UnknownHostException: flinktest-flink-
> >>> jobmanager1233445:
> >>> > Name or service not known
> >>> >   (Random name flinktest-flink-jobmanager1233445)
> >>> >  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native
> Method)
> >>> >  at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.
> >>> java:928)
> >>> >  at
> >>> > java.net.InetAddress.getAddres

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Chesnay Schepler
That's quite weird that it tries to us the local file-system. Maybe it 
derives the FS from state.backend.fs.checckpointdir, but uses it for 
savepoints.dir?


What happens if you set state.backend.fs.checkpointdir also to HDFS?


On 21.06.2018 08:07, Siew Wai Yow wrote:


Thanks Chesnay, the application will take value from 
"state.savepoints.dir" as default if set target-directory to nul. But 
then it trying to create the directory in local machine, which 
caused the below error because it is a HDFS directory. The same URL 
works in previous Flink 1.3.2. Is something break in Flink 1.5.0? Or 
anything we need to extra configure? Thank you.



Caused by: java.io.IOException: *Mkdirs failed to create* 
hdfs://192.168.56.150:8020/flink/savepoints/savepoint-82-fb6eca4f3dbb
at 
org.apache.flink.core.fs.local.*LocalFileSystem*.create(LocalFileSystem.java:271)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)

... 25 more



flink-conf.yaml:



state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir: hdfs://192.168.56.150:8020/flink/savepoints


-Yow




*From:* Chesnay Schepler 
*Sent:* Wednesday, June 20, 2018 3:15 PM
*To:* user@flink.apache.org
*Subject:* Re: Questions regarding to Flink 1.5.0 REST API change
I think you can set the target-directory to null. But I'm not sure why 
this particular request requires this, other request allow optional 
fields to simply be ommitted...


On 20.06.2018 06:12, Siew Wai Yow wrote:


Hi all,


Seems pass in target-directory is a must now for checkpoints REST 
API, and the status will not response with save point directory 
anymore. I can pass in but the information is redundant with the same 
already defined in flink-config.yaml. May I know is there a way to 
retrieve the save point directory from flink-config.yaml in flink 
application? I am not able to get it from env.getConfig(). Thank you.





*From:* Chesnay Schepler  
*Sent:* Tuesday, June 19, 2018 11:55 PM
*To:* user@flink.apache.org 
*Subject:* Re: Questions regarding to Flink 1.5.0 REST API change
1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the "mode" 
query parameter


2. POST to /jobs/:jobid/savepoints, with a json payload. Returns a 
trigger id, used for 3).

{|
"target-directory" : { |
|"type" : "string" }, "cancel-job" : { "type" : "boolean" } } |
3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:

For #1, you need to use a PATCH request to "*/jobs/:jobid*"

On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow > wrote:


Hi,


Regarding to Flink 1.5.0 REST API breaking change,

  * /The REST API to cancel a job was changed./
  * /The REST API to cancel a job with savepoint was changed./

I have few dump questions,


 1. Any replacement for cancellation ONLY without save-point?
Only found "*/jobs/:jobid/savepoints*".
 2. For "*/jobs/:jobid/savepoints*", how could I form the URL
with cancellation and with directory afterward?
 1. http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??

 3. Any cancellation progress monitoring in 1.5 like previous
version 1.3/1.4?
 1. previous version:
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId


Thank you.

Regards,
Yow