How to access JobManager and TaskManager

2018-01-30 Thread xiatao123
In the web UI, I can see these information under JobManager. How can I access
variables job_env in main method?

Job Manager
Configuration
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir   /etc/hadoop/conf
high-availability.cluster-idapplication_1517362137681_0001
job_env stage
jobmanager.rpc.address  ip-172-32-37-243.us-west-2.compute.internal
jobmanager.rpc.port 46253
jobmanager.web.port 0
taskmanager.numberOfTaskSlots   4

When Task Manager starts, I also noticed the same setting "job_env" is
loaded in GlobalConfiguration.

2018-01-31 01:34:54,970 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.yarn.conf.dir, /etc/hadoop/conf
2018-01-31 01:34:54,976 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.maxRegistrationDuration, 5 minutes
2018-01-31 01:34:54,979 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.cluster-id,
application_1517362137681_0001
2018-01-31 01:34:54,979 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
2018-01-31 01:34:54,979 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 4
2018-01-31 01:34:54,982 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address,
ip-172-32-37-243.us-west-2.compute.internal
2018-01-31 01:34:54,982 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: job_env, stage
2018-01-31 01:34:54,982 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 0
2018-01-31 01:34:54,983 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 46253

BUT, when I try to access or print out all the variables in my main method
val configs = GlobalConfiguration.loadConfiguration().toMap
for ((k,v) <- configs) println(s"Configs key: $k, value: $v")
I only got these 3:
Configs key: env.hadoop.conf.dir, value: /etc/hadoop/conf
Configs key: taskmanager.numberOfTaskSlots, value: 4
Configs key: env.yarn.conf.dir, value: /etc/hadoop/conf

anyone can help?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: Joining data in Streaming

2018-01-30 Thread Marchant, Hayden
Stefan,

So are we essentially saying that in this case, for now, I should stick to 
DataSet / Batch Table API?

Thanks,
Hayden

-Original Message-
From: Stefan Richter [mailto:s.rich...@data-artisans.com] 
Sent: Tuesday, January 30, 2018 4:18 PM
To: Marchant, Hayden [ICG-IT] 
Cc: user@flink.apache.org; Aljoscha Krettek 
Subject: Re: Joining data in Streaming

Hi,

as far as I know, this is not easily possible. What would be required is 
something like a CoFlatmap function, where one input stream is blocking until 
the second stream is fully consumed to build up the state to join against. 
Maybe Aljoscha (in CC) can comment on future plans to support this.

Best,
Stefan

> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden :
> 
> We have a use case where we have 2 data sets - One reasonable large data set 
> (a few million entities), and a smaller set of data. We want to do a join 
> between these data sets. We will be doing this join after both data sets are 
> available.  In the world of batch processing, this is pretty straightforward 
> - we'd load both data sets into an application and execute a join operator on 
> them through a common key.   Is it possible to do such a join using the 
> DataStream API? I would assume that I'd use the connect operator, though I'm 
> not sure exactly how I should do the join - do I need one 'smaller' set to be 
> completely loaded into state before I start flowing the large set? My concern 
> is that if I read both data sets from streaming sources, since I can't be 
> guaranteed of the order that the data is loaded, I may lose lots of potential 
> joined entities since their pairs might not have been read yet. 
> 
> 
> Thanks,
> Hayden Marchant
> 
> 



Re: Joining data in Streaming

2018-01-30 Thread Xingcan Cui
Hi Hayden,

To perform a full-history join on two streams has not been natively
supported now.

As a workaround, you may implement a CoProcessFunction and cache the
records from both sides in states until the stream with fewer data has been
fully cached. Then you could safely clear the cache for the "larger
stream", which should have produced completed results, and perform a nested
loop join (i.e., whenever comes a new record, join it with the fully cached
set).

Hope this helps.

Best,
Xingcan

On Tue, Jan 30, 2018 at 7:42 PM, Marchant, Hayden 
wrote:

> We have a use case where we have 2 data sets - One reasonable large data
> set (a few million entities), and a smaller set of data. We want to do a
> join between these data sets. We will be doing this join after both data
> sets are available.  In the world of batch processing, this is pretty
> straightforward - we'd load both data sets into an application and execute
> a join operator on them through a common key.   Is it possible to do such a
> join using the DataStream API? I would assume that I'd use the connect
> operator, though I'm not sure exactly how I should do the join - do I need
> one 'smaller' set to be completely loaded into state before I start flowing
> the large set? My concern is that if I read both data sets from streaming
> sources, since I can't be guaranteed of the order that the data is loaded,
> I may lose lots of potential joined entities since their pairs might not
> have been read yet.
>
>
> Thanks,
> Hayden Marchant
>
>
>


Re: Task Manager detached under load

2018-01-30 Thread Cliff Resnick
I've seen a similar issue while running successive Flink SQL batches on
1.4. In my case, the Job Manager would fail with the log output about
unreachability (with an additional statement about something going
"horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
everything works perfectly, but we will try again soon on 1.4. When we do I
will post the actual log output.

This was on YARN in AWS, with akka.ask.timeout = 60s.

On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel 
wrote:

> I haven’t gotten much further with this. It doesn’t look like GC related -
> at least GC counters were not that atrocious. However, my main concern was
> once the load subsides why aren’t TM and JM connecting again? That doesn’t
> look normal. I could definitely tell JM was listening on the port and from
> logs it does appear TM is trying to message JM that is still alive.
>
> Thanks, Ashish
>
> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard 
> wrote:
>
> Hi.
>
> Did you find a reason for the detaching ?
> I sometimes see the same on our system running Flink 1.4 on dc/os. I have
> enabled taskmanager.Debug.memory.startlogthread for debugging.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong :
>
> Hi,
>
> You should enable and check your garbage collection log.
>
> We've encountered case where Task Manager disassociated due to long GC
> pause.
>
>
> Regards,
>
> Kien
> On 1/20/2018 1:27 AM, ashish pok wrote:
>
> Hi All,
>
> We have hit some load related issues and was wondering if any one has some
> suggestions. We are noticing task managers and job managers being detached
> from each other under load and never really sync up again. As a result,
> Flink session shows 0 slots available for processing. Even though, apps are
> configured to restart it isn't really helping as there are no slots
> available to run the apps.
>
>
> Here are excerpt from logs that seemed relevant. (I am trimming out rest
> of the logs for brevity)
>
> *Job Manager:*
> 2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  Starting JobManager (Version: 1.4.0, Rev:3a9d9f2,
> Date:06.12.2017 @ 11:08:40 UTC)
>
> 2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  Maximum heap size: 16384 MiBytes
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  Hadoop version: 2.6.5
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  JVM Options:
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - -Xms16384m
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - -Xmx16384m
> 2018-01-19 12:38:00,795 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - -XX:+UseG1GC
>
> 2018-01-19 12:38:00,908 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.rpc.port, 6123
> 2018-01-19 12:38:00,908 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.heap.mb, 16384
>
>
> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher
>  - Detected unreachable: [akka.tcp://flink@:37840
> ]
> 2018-01-19 12:53:34,676 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Task manager akka.tcp://flink@:
> 37840/user/taskmanager terminated.
>
> -- So once Flink session boots up, we are hitting it with pretty heavy
> load, which typically results in the WARN above
>
> *Task Manager:*
> 2018-01-19 12:38:01,002 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> -  Starting TaskManager (Version: 1.4.0, Rev:3a9d9f2,
> Date:06.12.2017 @ 11:08:40 UTC)
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> -  Hadoop version: 2.6.5
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> -  JVM Options:
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -Xms16384M
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -Xmx16384M
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -XX:MaxDirectMemorySize=8388607T
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -XX:+UseG1GC
>
> 2018-01-19 12:38:01,392 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.rpc.port, 6123
> 2018-01-19 12:38:01,392 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.heap.mb, 16384
>
>
> 2018-01-19 12:54:48,626 WARN  

Re: Sync and Async checkpoint time

2018-01-30 Thread Stefan Richter
Hi,

this looks like the timer service is the culprit for this problem. Timers are 
currently not stored in the state backend, but in a separate on-heap data 
structure that does not support copy-on-write or async snapshots in general. 
Therefore, writing the timers for a snapshot is always synchronous and this 
explanation would also match your observation that the problem mainly affects 
window operators, which make heavy use of timers.

Best,
Stefan

> Am 30.01.2018 um 18:17 schrieb Sofer, Tovi :
> 
> Hi group,
>  
> In our project we are using asynchronous  FSStateBackend, and we are trying 
> to move to distributed storage – currently S3.
> When using this storage we are experiencing issues of high backpressure and 
> high latency, in comparison of local storage.
> We are trying to understand the reason, since the checkpoint is asynchronous, 
> so it shouldn’t have such high effect.
>  
> We looked at checkpoint history in web, and details from log.
> · From web it seems that Sync checkpoint duration is much higher then 
> Async duration. (again, this is only when using s3, not when using local 
> storage)
> This happens especially in window operators (tumbling windows) such as below.
> · But from log Sync time seems very short…
>  
> Do you have any estimation why the async write to FSStateBackend has such 
> high effect on the stream performance?
>  
> Checkpoint config:
> env.enableCheckpointing(6);
> env.setStateBackend(new FsStateBackend(checkpointDirURI, true));
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2);
>  
>  
> · Checkpoint info from console:
> 
>  
> · Checkpoint info from log:
> 2018-01-30 07:33:36,416 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory 
> @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> ,
>  asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] 
> took 12139 ms.
> 2018-01-30 07:33:36,418 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-83] Received acknowledge message for 
> checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:36,676 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
> [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> ,
>  asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] 
> took 12396 ms.
> 2018-01-30 07:33:36,677 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,347 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
> [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> ,
>  asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] 
> took 13067 ms.
> 2018-01-30 07:33:37,349 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,418 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory 
> @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> ,
>  asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] 
> took 13143 ms.
> 2018-01-30 07:33:37,420 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,508 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory 
> @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> ,
>  asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] 
> took 13234 ms.
> 2018-01-30 07:33:37,509 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,589 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
> 

Sync and Async checkpoint time

2018-01-30 Thread Sofer, Tovi
Hi group,

In our project we are using asynchronous  FSStateBackend, and we are trying to 
move to distributed storage - currently S3.
When using this storage we are experiencing issues of high backpressure and 
high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, 
so it shouldn't have such high effect.

We looked at checkpoint history in web, and details from log.

* From web it seems that Sync checkpoint duration is much higher then 
Async duration. (again, this is only when using s3, not when using local 
storage)
This happens especially in window operators (tumbling windows) such as below.

* But from log Sync time seems very short...


Do you have any estimation why the async write to FSStateBackend has such high 
effect on the stream performance?

Checkpoint config:

env.enableCheckpointing(6);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2);



* Checkpoint info from console:
[cid:image004.png@01D399E6.3A2F69F0]


* Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 
12139 ms.
2018-01-30 07:33:36,418 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-83] Received acknowledge message for 
checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 
12396 ms.
2018-01-30 07:33:36,677 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 
13067 ms.
2018-01-30 07:33:37,349 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 
13143 ms.
2018-01-30 07:33:37,420 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 
13234 ms.
2018-01-30 07:33:37,509 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
[ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task 
Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 
13403 ms.
2018-01-30 07:33:37,680 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 

Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-30 Thread Christophe Jolif
Thanks Chesnay, so if I read it well it shouldn't be too long (at least
less time than between regular 1.x releases).

On Mon, Jan 29, 2018 at 4:24 PM, Chesnay Schepler 
wrote:

> As of right now there is no specific date, see also
> https://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html
> .
>
>
> On 29.01.2018 13:41, Christophe Jolif wrote:
>
> Thanks a lot. Is there any timeline for 1.5 by the way?
>
> --
> Christophe
>
> On Mon, Jan 29, 2018 at 11:36 AM, Tzu-Li (Gordon) Tai  > wrote:
>
>> Hi Christophe,
>>
>> Thanks a lot for the contribution! I’ll add reviewing the PR to my
>> backlog.
>> I would like / will try to take a look at the PR by the end of this week,
>> after some 1.4.1 blockers which I’m still busy with.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 29 January 2018 at 9:25:27 AM, Fabian Hueske (fhue...@gmail.com)
>> wrote:
>>
>> Hi Christophe,
>>
>> great! Thanks for your contribution.
>> I'm quite busy right now, but I agree that we should have support for ES
>> 5.3 and Es 6.x for the next minor release 1.5.
>>
>> Best,
>> Fabian
>>
>>
>> 2018-01-26 23:09 GMT+01:00 Christophe Jolif :
>>
>>> Ok, I got it "done". I have a PR for ES5.3 (FLINK-7386) just rebasing
>>> the original one that was never merged (#4675). And added ES 6.X through
>>> RestHighLevelClient on top (FLINK-8101).  This is:
>>> https://github.com/apache/flink/pull/5374. And believe it or not but
>>> someone else submitted a PR for those two as well today! See:
>>> https://github.com/apache/flink/pull/5372. So looks like there is some
>>> traction to get it done? If would really be good if a committer could look
>>> at those PRs and let us know which one is closer to get merge so we focus
>>> on it instead of duplicating work ;)
>>>
>>> Thanks,
>>> --
>>> Christophe
>>>
>>> On Fri, Jan 26, 2018 at 1:46 PM, Christophe Jolif 
>>> wrote:
>>>
 Fabien,

 Unfortunately I need more than that :) But this PR is definitely a
 first step.

 My real need is Elasticsearch 6.x support through RestHighLevel client.
 FYI Elastic has deprecated the TransportClient that Flink connector
 leverages and it will be removed in Elasticsearch 8 (presumably ~1.5 years
 from now at their current release pace). Also TransportClient is not
 working with hosted version of Elasticsearch like Compose.io. So I think it
 makes a lot of sense to start introduce a sink based on RestHighLevel
 client. I'll be looking at creating a PR for that.

 Thanks,

 --
 Christophe

 On Fri, Jan 26, 2018 at 10:11 AM, Fabian Hueske 
 wrote:

> Great, thank you!
> Hopefully, this pushes the PR forward.
>
> Thanks, Fabian
>
> 2018-01-25 22:30 GMT+01:00 Christophe Jolif :
>
>> Hi Fabian,
>>
>> FYI I rebased the branch and tested it and it worked OK on a sample.
>>
>> --
>> Christophe
>>
>> On Mon, Jan 22, 2018 at 2:53 PM, Fabian Hueske 
>> wrote:
>>
>>> Hi Adrian,
>>>
>>> thanks for raising this issue again.
>>> I agree, we should add support for newer ES versions.
>>> I've added 1.5.0 as target release for FLINK-7386 and bumped the
>>> priority up.
>>>
>>> In the meantime, you can try Flavio's approach (he responded to the
>>> mail thread you linked) and fork and fix the connector.
>>> You could also try the PR for FLINK-7386 [1] and comment on the pull
>>> request whether it works for you or not.
>>>
>>> Best, Fabian
>>>
>>> [1] https://github.com/apache/flink/pull/4675
>>>
>>>
>>> 2018-01-22 13:54 GMT+01:00 Adrian Vasiliu :
>>>
 Hello,

 With a local run of Flink 1.4.0, ElasticsearchSink fails for me with
 a local run of Elasticsearch 5.6.4 and 5.2.1, while the same code
 (with adjusted versions of dependencies) works fine with Elasticsearch 
 2.x
 (tried 2.4.6).
 I get:
 java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.
 BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)L
 org/elasticsearch/action/bulk/BulkProcessor

 (env: Mac OSX 10.13.2, oracle jdk 1.8.0_112)

 Now, this looks similar to the issue referred in
 http://apache-flink-user-mailing-list-archive.2336050.n4.nab
 ble.com/Elasticsearch-Sink-Error-td15246.html
 which points to
 "Flink Elasticsearch 5 connector is not compatible with
 Elasticsearch 5.2+ client"
 https://issues.apache.org/jira/browse/FLINK-7386

 Side-remark: when trying with Elasticsearch 5.6.4 via a docker
 container, for some reason the error I get is different: 
 "RuntimeException:
 Client is not connected to any Elasticsearch nodes!" 

Re: Send ACK when all records of file are processed

2018-01-30 Thread Piotr Nowojski
In case of reading from input files, at the EOF event, readers will send 
Watermark(Long.MAX_VALUE) on all of the output edges and those watermarks will 
be propagated accordingly. So your ACK operator will get 
Watermark(Long.MAX_VALUE) only when it gets it from ALL of it’s input edges.

When reading from Kafka, you do not have an EOF event, so you it would not be 
possible to use this Watermark(Long.MAX_VALUE). In that case you would need to 
emit some dummy EOF record, containing some meta information like filename 
alongside with correctly set event time to a value greater then original even 
read from Kafka which contained the filename to process. You would have to pass 
this EOF dummy record to your EOF operator. There you you would need to create 
some kind of mapping 

fileName -> event time marking EOF

And each time you process EOF record, you add new entry to this mapping. Now 
whenever you process watermarks, you can check for which fileNames does this 
watermark guarantees that file has been processed completely.

However this is more complicated and you would have to handle thins like:
- cleaning up the mapping (avoiding OutOfMemory)
- making sure that watermarks are generated without unnecessary latencies (when 
reading from file, EOF immediately emits Watermark(Long.MAX_VALUE), which might 
not always be the case for Kafka: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 
)

Piotrek

> On 30 Jan 2018, at 15:17, Vinay Patil  wrote:
> 
> Yeh, so this is the current implementation.
> 
> One question regarding the Watermark, since watermark is chosen as minimum 
> value of all of input streams, only one input  stream will have watermark 
> value to LONG.MAX_VALUE which denotes the EOF processing whereas the other 
> streams will not have this value , is my understanding right ? So in this 
> case LONG.MAX_VALUE will always be a greater value than it's input streams. 
> Or the LONG.MAX_VALUE watermark will flow from each input stream ?
> 
> 
> I was thinking of directly reading from Kafka as source in Flink in order to 
> remove the middle layer of independent Kafka Consumer which is triggering 
> Flink job.
> 
> So, the pipeline will be 1. readFrom Kafka -> take the File location -> read 
> using FileReaderOperator
> 
> But in this case how do I determine for which File I have received the 
> LONG.MAX_VALUE, it will get complicated.
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Tue, Jan 30, 2018 at 1:57 AM, Piotr Nowojski  > wrote:
> Thanks for the clarification :)
> 
> Since you have one Job per an ACK, you can just relay on 
> Watermark(Long.MAX_VALUE) to mark the end of the processing.
> 
> More complicated solution (compared to what I proposed before) would be 
> needed if you had one long living job (for example multiple weeks) and it 
> would need to produce multiple ACKs in different point of time.
> 
> Piotrek
> 
> 
>> On 29 Jan 2018, at 15:43, Vinay Patil > > wrote:
>> 
>> Sure, here is the complete design that we have :
>> 
>> File metadata (NFS location of file) is stored in kafka , we are having a 
>> Kafka Consumer (not flink one) which will read from each partition and 
>> trigger a Flink job on cluster. 
>> 
>> The Flink job will then read from a file and do the processing as I 
>> mentioned earlier.
>> 
>> The requirement here is we need to trigger a ACK if the validations for all 
>> the records in a file are successful.
>> 
>> P.S I know we are not using Kafka to its full potential and are just using 
>> it for storing metadata :) 
>> 
>> Regards,
>> Vinay Patil
>> 
>> On Thu, Jan 25, 2018 at 11:57 AM, Piotr Nowojski > > wrote:
>> Could you rephrase what is your concern? 
>> 
>> Thanks, Piotrek
>> 
>> 
>>> On 25 Jan 2018, at 18:54, Vinay Patil >> > wrote:
>>> 
>>> Hi,
>>> 
>>> No, to clarify I need to send the ack for each file when it gets processed 
>>> completely and there are multiple files that I am going to read from the 
>>> shared location.
>>> 
>>> Regards,
>>> Vinay Patil
>>> 
>>> On Thu, Jan 25, 2018 at 11:37 AM, Piotr Nowojski >> > wrote:
 
 Yes, make sense. Just looked at the code of ContinousFileReaderOperator 
 ,it does not emit any Watermark in the processWatermark function. It only 
 does it in the close function , so we will get the max value when all 
 records are read.
 
>>> 
>>> Yes.
>>> 
 What if I am reading multiple files from a shared location, in that case I 
 will 

Re: Joining data in Streaming

2018-01-30 Thread Stefan Richter
Hi,

as far as I know, this is not easily possible. What would be required is 
something like a CoFlatmap function, where one input stream is blocking until 
the second stream is fully consumed to build up the state to join against. 
Maybe Aljoscha (in CC) can comment on future plans to support this.

Best,
Stefan

> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden :
> 
> We have a use case where we have 2 data sets - One reasonable large data set 
> (a few million entities), and a smaller set of data. We want to do a join 
> between these data sets. We will be doing this join after both data sets are 
> available.  In the world of batch processing, this is pretty straightforward 
> - we'd load both data sets into an application and execute a join operator on 
> them through a common key.   Is it possible to do such a join using the 
> DataStream API? I would assume that I'd use the connect operator, though I'm 
> not sure exactly how I should do the join - do I need one 'smaller' set to be 
> completely loaded into state before I start flowing the large set? My concern 
> is that if I read both data sets from streaming sources, since I can't be 
> guaranteed of the order that the data is loaded, I may lose lots of potential 
> joined entities since their pairs might not have been read yet. 
> 
> 
> Thanks,
> Hayden Marchant
> 
> 



Re: How back pressure is handled by source?

2018-01-30 Thread Stefan Richter
Hi,

backpressure comes into play when the source is attempting to pass the 
generated events to downstream operators. If the downstream operators build up 
backpressure, passing data to them can block. You might think of this like a 
bounded queue that is full in case of backpressure and blocks until capacity is 
again available, so the source is slowed down because it will have to wait 
until it becomes unblocked before the loop comes again to the „reading" part. 
Maybe this is helpful: 
https://data-artisans.com/blog/how-flink-handles-backpressure 
 .

Best,
Stefan


> Am 30.01.2018 um 14:36 schrieb Pawel Bartoszek :
> 
> Hi,
> 
> I am interested in how back pressure is handled by sources in Flink ie 
> Kinesis source. From what I understood back pressure is a mechanism to slow 
> down rate at which records are read from the stream. However, in the kinesis 
> source code I can see that it configured to read the same number of rows 
> (default set by aws sdk kinesis library).   



How back pressure is handled by source?

2018-01-30 Thread Pawel Bartoszek
Hi,

I am interested in how back pressure is handled by sources in Flink ie
Kinesis source. From what I understood back pressure is a mechanism to slow
down rate at which records are read from the stream. However, in the
kinesis source code I can see that it configured to read the same number of
rows (default set by aws sdk kinesis library).


Re: How to make savepoints more robust in the face of refactorings ?

2018-01-30 Thread Tzu-Li (Gordon) Tai
Ah, I see. Yes, it seems like serializers for Scala tuples are generated 
anonymous-classes.
Since your window operator uses reducing state, the state type would be the 
same as the input type of the window (which in your case is a Scala 2-tuple).

In general, using Scala collections, case classes, and tuples (treated 
similarly to Scala case classes) will result in anonymous class serializers.

For now, for more robustness w.r.t. savepoint migrations I would suggest to 
avoid using those types.
For example, if you use a Pojo as the window input type [1], Flink's 
PojoSerializer will be used.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/types_serialization.html#rules-for-pojo-types
On 30 January 2018 at 12:25:34 PM, jelmer (jkupe...@gmail.com) wrote:

I looked into it a little more. The anonymous-classed serializer is being 
created here

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L1247

So far the only strategy for making it less likely to break is defining the 
Typeinformation in a trait like so and mixing it into to the operators


trait Tuple2TypeInformation {
  implicit val tuple2TypeInformation: TypeInformation[(String, Int)] = 
createTypeInformation[(String, Int)]
}

Then the inner class thats generated will be something like 
Tuple2TypeInformation$$anon$2$$annon$1 instead of 
com.ecg.foo.Main$Operators$$anon$3$$anon$1 and as long you don't rename this 
Tuple2TypeInformation around everything will work.. but it feels very 
suboptimal. 



On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai  wrote:
Hi,

In the Scala API, type serializers may be anonymous classes generated by Scala 
macros, and would therefore contain a reference to the wrapping class (i.e., 
your `Operators` class).
Since Flink currently serializes serializers into the savepoint to be used for 
deserialization on restore, and the fact that they must be present at restore 
time, changing the `Operators` classname would result in the previous anonymous 
class serializer to no longer be in the classpath and therefore fails the 
deserialization of the written serializer.
This is a limitation caused by how registering serializers for written state 
currently works in Flink.

Generally speaking, to overcome this, you would need to have the previous 
serializer class still around in the classpath when restoring, and can only be 
completely removed from user code once the migration is completed.

One thing that I’m not completely certain with yet, is where in your 
demonstrated code a anonymous-classed serializer is generated for some type.
From what I see, there shouldn’t be any anonymous-class serializers for the 
code. Is the code you provided a “simplified” version of the actual code in 
which you observed the restore error?

Cheers,
Gordon


On 28 January 2018 at 6:00:32 PM, jelmer (jkupe...@gmail.com) wrote:

Changing the class operators are nested in can break compatibility with 
existing savepoints. The following piece of code demonstrates this

https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a

If I change Operators in this file to Operators2  i will not be able to recover 
from a savepoint that was made  when this class still had its old name.

The error in the flink ui will be 

java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at 
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at 

Joining data in Streaming

2018-01-30 Thread Marchant, Hayden
We have a use case where we have 2 data sets - One reasonable large data set (a 
few million entities), and a smaller set of data. We want to do a join between 
these data sets. We will be doing this join after both data sets are available. 
 In the world of batch processing, this is pretty straightforward - we'd load 
both data sets into an application and execute a join operator on them through 
a common key.   Is it possible to do such a join using the DataStream API? I 
would assume that I'd use the connect operator, though I'm not sure exactly how 
I should do the join - do I need one 'smaller' set to be completely loaded into 
state before I start flowing the large set? My concern is that if I read both 
data sets from streaming sources, since I can't be guaranteed of the order that 
the data is loaded, I may lose lots of potential joined entities since their 
pairs might not have been read yet. 


Thanks,
Hayden Marchant




Re: How to make savepoints more robust in the face of refactorings ?

2018-01-30 Thread jelmer
I looked into it a little more. The anonymous-classed serializer is being
created here

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L1247

So far the only strategy for making it less likely to break is defining the
Typeinformation in a trait like so and mixing it into to the operators


trait Tuple2TypeInformation {
  implicit val tuple2TypeInformation: TypeInformation[(String, Int)] =
createTypeInformation[(String, Int)]
}

Then the inner class thats generated will be something like
Tuple2TypeInformation$$anon$2$$annon$1 instead of
com.ecg.foo.Main$Operators$$anon$3$$anon$1 and as long you don't rename
this Tuple2TypeInformation around everything will work.. but it feels very
suboptimal.



On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> In the Scala API, type serializers may be anonymous classes generated by
> Scala macros, and would therefore contain a reference to the wrapping class
> (i.e., your `Operators` class).
> Since Flink currently serializes serializers into the savepoint to be used
> for deserialization on restore, and the fact that they must be present at
> restore time, changing the `Operators` classname would result in the
> previous anonymous class serializer to no longer be in the classpath and
> therefore fails the deserialization of the written serializer.
> This is a limitation caused by how registering serializers for written
> state currently works in Flink.
>
> Generally speaking, to overcome this, you would need to have the previous
> serializer class still around in the classpath when restoring, and can only
> be completely removed from user code once the migration is completed.
>
> One thing that I’m not completely certain with yet, is where in your
> demonstrated code a anonymous-classed serializer is generated for some type.
> From what I see, there shouldn’t be any anonymous-class serializers for
> the code. Is the code you provided a “simplified” version of the actual
> code in which you observed the restore error?
>
> Cheers,
> Gordon
>
>
> On 28 January 2018 at 6:00:32 PM, jelmer (jkupe...@gmail.com) wrote:
>
> Changing the class operators are nested in can break compatibility with
> existing savepoints. The following piece of code demonstrates this
>
> https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a
>
> If I change Operators in this file to Operators2  i will not be able to
> recover from a savepoint that was made  when this class still had its old
> name.
>
> The error in the flink ui will be
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:293)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:225)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:692)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:679)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMet
> aInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:1216)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
> RocksDBKeyedStateBackend.java:1153)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> restore(RocksDBKeyedStateBackend.java:1034)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:773)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
>
> But the real reason is found in the task manager logs
>
>
> 2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.
> TypeSerializerSerializationUtil  - Deserialization of serializer errored;
> replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$
> TypeSerializerSerializationProxy.read(TypeSerializerSerializationUti
> l.java:463)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUti
> 

Re: Trigger Time vs. Latest Acknowledgement

2018-01-30 Thread Aljoscha Krettek
Hi,

I think a) doesn't hold because there is no synchronisation between the 
CheckpointCoordinator and the sources doing the reading. I think b) will hold 
but it's also not exact because of clock differences between different machines 
and whatnot.

Best,
Aljoscha

> On 29. Jan 2018, at 15:34, Juho Autio  wrote:
> 
> I'm triggering nightly savepoints at 23:59:00 with crontab on the flink 
> cluster.
> 
> For example last night's savepoint has this information:
> 
> Trigger Time: 23:59:14
> Latest Acknowledgement: 00:00:59
> 
> What are the min/max boundaries for the data contained by the savepoint? Can 
> I deduce from this either of the following:
> 
> a) the savepoint cannot contain any data that was produced after 23:59:14
> b) the savepoint cannot contain any data that was produced after 00:00:59
> 
> My use case is like this: if I restore the nightly savepoint, I want to be 
> sure that any data that was produced during the current day will be included 
> (+ some data from the previous day, that's ok). If the answer to above 
> question is that (a) is false, but (b) holds, that would mean that I would 
> need to trigger the savepoint early enough for it to complete before the 
> midnight.
> 
> Something from the docs that doesn't seem to answer my question:
> 
> > Trigger Time: The time when the checkpoint was triggered at the JobManager.
> > Latest Acknowledgement: The time when the latest acknowledged for any 
> > subtask was received at the JobManager (or n/a if no acknowledgement 
> > received yet).
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/checkpoint_monitoring.html#history-tab
>  
> 



Re: AKA and quarantine

2018-01-30 Thread Till Rohrmann
If you don't run Flink in standalone mode, then you can activate
taskmanager.exit-on-fatal-akka-error. However, keep in mind that at some
point you might run out of spare TMs to run your jobs unless you restart
them manually.

Cheers,
Till

On Mon, Jan 29, 2018 at 6:41 PM, Vishal Santoshi 
wrote:

> >> If you enable taskmanager.exit-on-fatal-akka-error, then it will stop
> TMs which got quarantined. This will automatically restart TMs in case that
> you are running Flink on Yarn. Thus, I would recommend enabling this if
> possible
>
> We do not use yarn. This would end up restarting the jobs on the remaining
> TMs ( if retry is configured ) may be OK if we have enough resources ?
>
> >>The akka.ask.timeout parameter controls the timeout for remote
> messages. You should only increase this if you observe timeouts between the
> different Flink components. What you can change in order to account for
> heavy load or GC pauses is the heartbeat interval and pause
> via akka.watch.heartbeat.interval and akka.watch.heartbeat.pause. This
> will most likely mitigate the problem of death watch failures.
>
> Will do. I think GC pauses are ephemeral given the nature of  couple of
> our pipelines.
>
> Thank you for looking into the other 2.  The ROCKSDB options stuff is
> interesting and we have known that, but would dig deeper to make this per
> pipeline. We do have sense of how much state is better kept in the memtable
> to prevent a SSTable enquiry.
>
> On Mon, Jan 29, 2018 at 12:13 PM, Till Rohrmann 
> wrote:
>
>> Hi Vishal,
>>
>> Akka usually quarantines remote ActorSystems in case of a system message
>> delivery failure or if the death watch was triggered. This can, for
>> example, happen if your machine is under heavy load or has a high GC
>> pressure and does not find enough time to respond to the heartbeats.
>>
>> - If you enable taskmanager.exit-on-fatal-akka-error, then it will stop
>> TMs which got quarantined. This will automatically restart TMs in case that
>> you are running Flink on Yarn. Thus, I would recommend enabling this if
>> possible
>> - The akka.ask.timeout parameter controls the timeout for remote
>> messages. You should only increase this if you observe timeouts between the
>> different Flink components. What you can change in order to account for
>> heavy load or GC pauses is the heartbeat interval and pause
>> via akka.watch.heartbeat.interval and akka.watch.heartbeat.pause. This
>> will most likely mitigate the problem of death watch failures.
>> - There is an effort to add resource specifications to Flink operators.
>> It is not yet fully implemented but you can take a look at ResourceSpec to
>> see what you can define for each operator. Once fully implemented, Flink
>> will then make sure that each operator gets a slot with enough resources.
>> - For RocksDB's resource consumption there aren't any Flink metrics yet.
>> If you want to learn more about it's resource consumption please take a
>> look here [1]. You can, though, configure the ColumnFamilyOptions by
>> implementing an OptionsFactory. That way you can configure the memtable
>> size which is allocated for each Flink state.
>>
>> [1] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
>> [2] https://github.com/facebook/rocksdb/wiki/Set-Up-Options
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 29, 2018 at 4:05 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thank you.
>>>
>>> On Mon, Jan 29, 2018 at 3:17 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Vishal,

 sorry for the late response.
 Till (in CC) might be able to answer your Akka / coordination related
 questions.

 Best, Fabian

 2018-01-24 1:22 GMT+01:00 Vishal Santoshi :

> Any suggestions ?  I know these are very general issue but these are
> edge conditions that we want the community to give us general advise on ..
>
> On Sun, Jan 21, 2018 at 3:16 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> There have been a couple of instances where one of our TMs was
>> quarantined ( the cause is irrelevant to this discussion ).  And we had 
>> to
>> bounce the TM to bring back sanity to the cluster.  There have been
>> discussions around and am trying to distill them. My questions are
>>
>>
>> *  Based on https://issues.apache.org/jira/browse/FLINK-3347 is it
>> advisable to set the taskmanager.exit-on-fatal-akka-error  to true.
>> ?
>>
>> * Is the akka.ask.timeout relevant here ? We could increase the value
>> to greater than 10s but based on your experiences is it more of a  "mask
>> the issue" exercise or is 10s generally a low value that *should* be
>> increased ?
>>
>> * Is it possible or is there some effort being put into per job
>> memory/resource consumption for a multi job setup that is very normal 
>> with