Re: Setting Flink Monitoring API Port on YARN Cluster

2018-09-06 Thread Gary Yao
Hi Austin,

The config options rest.port, jobmanager.web.port, etc. are intentionally
ignored on YARN. The port should be chosen randomly to avoid conflicts with
other containers [1]. I do not see a way how you can set a fixed port at the
moment but there is a related ticket for that [2]. The Flink CLI determines
the hostname and port from the YARN ApplicationReport [3][4] – you can do
the
same.

Best,
Gary

[1]
https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java#L103

[2] https://issues.apache.org/jira/browse/FLINK-5758

[3]
https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L387

[4]
https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/yarn/api/records/ApplicationReport.html#getRpcPort()

On Fri, Sep 7, 2018 at 12:33 AM, Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi everyone,
>
> I'm running a YARN session on a cluster with one master and one core and
> would like to use the Monitoring API programmatically to submit jobs. I
> have found that the configuration variables are read but ignored when
> starting the session - it seems to choose a random port each run.
>
> Here's a snippet from the startup logs:
>
> 2018-09-06 21:44:38,763 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: env.yarn.conf.dir,
> /etc/hadoop/conf
> 2018-09-06 21:44:38,764 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: env.hadoop.conf.dir,
> /etc/hadoop/conf
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: rest.port, 44477
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.web.port, 44477
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: high-availability.jobmanager.port,
> 44477
> 2018-09-06 21:44:38,775 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli
>- Found Yarn properties file under
> /tmp/.yarn-properties-hadoop.
> 2018-09-06 21:44:39,615 WARN  org.apache.hadoop.util.NativeCodeLoader
>- Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
> 2018-09-06 21:44:39,799 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule
>   - Hadoop user set to hadoop (auth:SIMPLE)
> 2018-09-06 21:44:40,045 INFO  org.apache.hadoop.yarn.client.RMProxy
>- Connecting to ResourceManager at
> ip-10-2-3-71.ec2.internal/10.2.3.71:8032
> 2018-09-06 21:44:40,312 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Cluster specification: ClusterSpecification{masterMemoryMB=1024,
> taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-06 21:44:43,564 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Submitting application master application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
>- Submitted application application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Waiting for the cluster to be allocated
> 2018-09-06 21:44:43,804 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Deploying cluster, current state ACCEPTED
> 2018-09-06 21:44:48,326 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - YARN application has been deployed successfully.
> 2018-09-06 21:44:48,326 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - The Flink YARN client has been started in detached mode. In
> order to stop Flink on YARN, use the following command or a YARN web
> interface to stop it:
> yarn application -kill application_1536250520330_0007
> Please also note that the temporary files of the YARN session in the home
> directory will not be removed.
> 2018-09-06 21:44:48,821 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest client endpoint started.
> Flink JobManager is now running on ip-10-2-3-25.ec2.internal:38683 with
> leader id ----.
> JobManager Web Interface: http://ip-10-2-3-25.ec2.internal:38683
>
>
> I'm setting both the rest.port and jobmanager.web.port, but both are
> ignored. Has anyone seen this before?
>
> Thanks!
>


Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

2018-09-06 Thread 杨力
Hi all,
I am encountering a weird problem when running flink 1.6 in yarn per-job
clusters.
The job fails in about half an hour after it starts. Related logs is
attached as an imange.

This piece of log comes from one of the taskmanagers. There are not any
other related log lines.
No ERROR-level logs. The job just runs for tens of minutes without printing
any logs
and suddenly throws this exception.

It is reproducable in my production environment, but not in my test
environment.
The 'Buffer pool is destroed' exception is always thrown while emitting
latency marker.


image.png
Description: Binary data


Re: Cannot set classpath which can be used before job is submitted to yarn.

2018-09-06 Thread bupt_ljy
Hi Hequn,


I can read it by using the full path, but I want it to be in program's 
classpath. For example, I use “bin/flink run -m yarn-cluster” to run the 
program on Server1, and I have a conf file “config.conf" located in 
“/server/conf” on Server1, I can’t read this file by using 
ConfigFactory.load(“config.conf”) unless I make changes to the “bin/flink” 
shell like:


FLOW_CLASSPATH="/server/conf"
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList 
"$FLOW_CLASSPATH:$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
org.apache.flink.client.cli.CliFrontend "$@"


Original Message
Sender:Hequn chengchenghe...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Friday, Sep 7, 2018 09:29
Subject:Re: Cannot set classpath which can be used before job is submitted to 
yarn.


Hi bupt,


No sure about the answer.
Do you mean that you can't read the file from local FS? Have you ever tried 
load the file through a full path? or you choose a wrong classloader.


Best, Hequn


On Thu, Sep 6, 2018 at 11:01 PM bupt_ljy bupt_...@163.com wrote:

Hi,all
 I’m using “bin/flink run -m yarn-cluster” to run my program on yarn. However, 
it seems that I can’t add my own files into classpath before the the job is 
submitted to yarn. For example, I have a conf file, which located in my own 
conf directory, and I need to load file from the conf directory to initialize 
some classes instances before it’s submitted to yarn.
 I’m grateful if anyone can give me some help.
 Thanks.

Re: Behaviour of Process Window Function

2018-09-06 Thread vino yang
Hi Harshvardhan,

1) Yes, ProcessWindowFunction extends AbstractRichFunction, through
getRuntimeContext,you can access keyed state API.
2) ProcessWindowFunction has given you considerable flexibility, you can
based on processing time / event time / timer / it's clear method /
customized implementation, the specific design depends on your business
logic, how long you need to save the cache.

Thanks, vino.

Harshvardhan Agrawal  于2018年9月6日周四 下午10:10写道:

> Hello,
>
> We have a Flink pipeline where we are windowing our data after a keyBy.
> i.e.
> myStream.keyBy().window().process(MyIncrementalAggregation(),
> MyProcessFunction()).
>
> I have two questions about the above line of code:
> 1) Does the state in the process window function qualify as KeyedState or
> OperatorState? If operator state, can we access KeyedState from the Process
> Window function?
> 2) We also have certain reference information that we want to share across
> all keys in the process window function. We are currently storing all that
> info in a Guava cache. We want to be able to rehydrate the guava cache at
> the beginning of each window by making an external rest call and clear the
> cache at the end of that respective window. How can we enforce this
> behaviour in Flink? Do I need to use a timerservice for this where the
> callback will be a window.maxtimestamp() or just clearing the cache in the
> clear method will do the trick?
>
> --
>
> *Regards,Harshvardhan Agrawal*
>


Re: After OutOfMemoryError State can not be readed

2018-09-06 Thread vino yang
Hi Edward,

>From this log: Caused by: java.io.EOFException, it seems that the state
metadata file has been corrupted.
But I can't confirm it, maybe Stefan knows more details, Ping him for you.

Thanks, vino.

Edward Rojas  于2018年9月7日周五 上午1:22写道:

> Hello all,
>
> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
> When performing some load testing we got an /OutOfMemoryError: native
> memory
> exhausted/, causing the job to fail and be restarted.
>
> After the Taskmanager is restarted, the job is recovered from a Checkpoint,
> but it seems that there is a problem when trying to access the state. We
> got
> the error from the *onTimer* function of a *onProcessingTime*.
>
> It would be possible that the OOM error could have caused to checkpoint a
> corrupted state?
>
> We get Exceptions like:
>
> TimerException{java.lang.RuntimeException: Error while retrieving data from
> RocksDB.}
> at
>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.lang.Thread.run(Thread.java:811)
> Caused by: java.lang.RuntimeException: Error while retrieving data from
> RocksDB.
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
> at
>
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
> at
>
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
> at
>
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
> at
>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> ... 7 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:208)
> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
> at
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
> ... 12 more
>
>
> Thanks in advance for any help
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Cannot set classpath which can be used before job is submitted to yarn.

2018-09-06 Thread Hequn Cheng
Hi bupt,

No sure about the answer.
Do you mean that you can't read the file from local FS? Have you ever tried
load the file through a full path? or you choose a wrong classloader.

Best, Hequn

On Thu, Sep 6, 2018 at 11:01 PM bupt_ljy  wrote:

> Hi,all
>
>I’m using “bin/flink run -m yarn-cluster” to run my program on yarn.
> However, it seems that I can’t add my own files into classpath before the
> the job is submitted to yarn. For example, I have a conf file, which
> located in my own conf directory, and I need to load file from the conf
> directory to initialize some classes instances before it’s submitted to
> yarn.
>
>   I’m grateful if anyone can give me some help.
>
>   Thanks.
>


How to customize schedule mode and result partition type?

2018-09-06 Thread 陈梓立
Hi all,

Here I prefer to forcing a task running in LAZY_FROM_SOURCE schedule mode
with all ResultPartitionType be BLOCKING.

But I cannot find options to config that in StreamExecutionEnvironment,
thus using below as a workaround, quite triky.

inal StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new InfiniteSourceFunction())
   .setParallelism(2)
   .shuffle()
   .addSink(new DiscardingSink<>())
   .setParallelism(2);

Field field = 
env.getClass().getSuperclass().getDeclaredField("transformations");
field.setAccessible(true);
List> transformations =
(List>) field.get(env);

StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
streamGraph.getCustomConfiguration().setString(ScheduleMode.class.getName(),
LAZY_FROM_SOURCES.toString());
streamGraph.setJobName(testname);
streamGraph.getStreamEdges(1, 3)
   .get(0).setResultPartitionType(ResultPartitionType.BLOCKING);


Best,
tison.


Re: Why FlinkKafkaConsumer doesn't subscribe to topics?

2018-09-06 Thread Renjie Liu
Hi, Julio:
Is checkpoint enabled in your job? Flink kafka connector only commits
offsets when checkpoint is enabled.

On Tue, Sep 4, 2018 at 11:43 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Julio,
>
> As Renjie had already mentioned, to achieve exactly-once semantics with
> the Kafka consumer, Flink needs to have control over the Kafka partition to
> source subtask assignment.
>
> To add a bit more detail here, this is due to the fact that each subtask
> writes to Flink managed state the current offsets of partitions that it is
> assigned, and that is coordinated with Fink’s checkpoints.
> If it were to use Kafka’s automatic consumer group assignments (i.e. when
> using the subscribe API), the consumer would have no control over when
> exactly partition subscriptions are reassigned across subtasks.
> If I understood correctly, what you were suggesting in your last reply was
> to simply use poll() to query the offset in the case that some partition
> was reassigned to another source subtask.
> This is problematic because there is no consistency guarantees between the
> committed offsets in Kafka and Fink’s checkpoints.
> Committing of offsets are and should only be used as a means to expose
> consumer progress to the outside world beyond the Flink job.
>
> Hope this provides a bit more insight.
>
> Cheers,
> Gordon
>
>
> On 4 September 2018 at 2:25:38 PM, Julio Biason (julio.bia...@azion.com)
> wrote:
>
> Hi Renjie,
>
> 1. For what I could grasp from Kafka docs, you can subscribe and still use
> poll() to capture a specific offset. But I just read the starting point of
> it and didn't go deep into it.
>
> 2. Currently, Flink 1.4.2, Kafka 0.10.1 and the FlinkKafkaConsumer010.
>
> On Tue, Sep 4, 2018 at 12:47 AM, Renjie Liu 
> wrote:
>
>> Hi, Julio:
>> 1. Flink doesn't use subscribe because it needs to control partition
>> assignment itself, which is important for implementing exactly once.
>> 2. Can you share the versions you are using, including kafka, kafka
>> client, flink?  We are also use flink kafka consumer and we can monitor it
>> correctly.
>>
>> On Tue, Sep 4, 2018 at 3:09 AM Julio Biason 
>> wrote:
>>
>>> Hey guys,
>>>
>>> We are trying to add external monitoring to our system, but we can only
>>> get the lag in kafka topics while the Flink job is running -- if, for some
>>> reason, the Flink job fails, we get no visibility on how big the lag is.
>>>
>>> (Besides that, the way Flink reports is not accurate and produces a lot
>>> of -Inf, which I already discussed before.)
>>>
>>> While looking at the problem, we noticed that the FlinkKafkaConsumer
>>> never uses `subscribe` to subscribe to the topics and that's why the values
>>> are never stored back into Kafka, even when the driver itself does
>>> `commitAsync`.
>>>
>>> Is there any reason for not subscribing to topics that I may have missed?
>>>
>>> --
>>> *Julio Biason*, Sofware Engineer
>>> *AZION*  |  Deliver. Accelerate. Protect.
>>> Office: +55 51 3083 8101   |  Mobile: +55 51
>>> *99907 0554*
>>>
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>
>
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: Setting Flink Monitoring API Port on YARN Cluster

2018-09-06 Thread 陈梓立
Hi Austin,

`rest.port` is the latest config option to configure "The port that the
server listens on / the client connects to.", with deprecated key
`web.port` which is with deprecated key `jobmanager.web.port`, so it is
enough to config `rest.port` only (at least for 1.6). However, in your case
the configuration should have worked.

Since Flink recognizes configuration from both flink-conf.yaml and
command-line, it would be helpful if you show us how you do the setting.

Best,
tison.


Austin Cawley-Edwards  于2018年9月7日周五 上午6:33写道:

> Hi everyone,
>
> I'm running a YARN session on a cluster with one master and one core and
> would like to use the Monitoring API programmatically to submit jobs. I
> have found that the configuration variables are read but ignored when
> starting the session - it seems to choose a random port each run.
>
> Here's a snippet from the startup logs:
>
> 2018-09-06 21:44:38,763 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> 2018-09-06 21:44:38,764 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> 2018-09-06 21:44:38,765 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: rest.port, 44477
> 2018-09-06 21:44:38,765 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.web.port, 44477
> 2018-09-06 21:44:38,765 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.jobmanager.port, 44477
> 2018-09-06 21:44:38,775 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
> properties file under /tmp/.yarn-properties-hadoop.
> 2018-09-06 21:44:39,615 WARN  org.apache.hadoop.util.NativeCodeLoader
>  - Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
> 2018-09-06 21:44:39,799 INFO
> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user
> set to hadoop (auth:SIMPLE)
> 2018-09-06 21:44:40,045 INFO  org.apache.hadoop.yarn.client.RMProxy
>  - Connecting to ResourceManager at
> ip-10-2-3-71.ec2.internal/10.2.3.71:8032
> 2018-09-06 21:44:40,312 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster
> specification: ClusterSpecification{masterMemoryMB=1024,
> taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-06 21:44:43,564 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting
> application master application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
> the cluster to be allocated
> 2018-09-06 21:44:43,804 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
> cluster, current state ACCEPTED
> 2018-09-06 21:44:48,326 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN
> application has been deployed successfully.
> 2018-09-06 21:44:48,326 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink
> YARN client has been started in detached mode. In order to stop Flink on
> YARN, use the following command or a YARN web interface to stop it:
> yarn application -kill application_1536250520330_0007
> Please also note that the temporary files of the YARN session in the home
> directory will not be removed.
> 2018-09-06 21:44:48,821 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest client endpoint started.
> Flink JobManager is now running on ip-10-2-3-25.ec2.internal:38683 with
> leader id ----.
> JobManager Web Interface: http://ip-10-2-3-25.ec2.internal:38683
>
>
> I'm setting both the rest.port and jobmanager.web.port, but both are
> ignored. Has anyone seen this before?
>
> Thanks!
>


Setting Flink Monitoring API Port on YARN Cluster

2018-09-06 Thread Austin Cawley-Edwards
Hi everyone,

I'm running a YARN session on a cluster with one master and one core and
would like to use the Monitoring API programmatically to submit jobs. I
have found that the configuration variables are read but ignored when
starting the session - it seems to choose a random port each run.

Here's a snippet from the startup logs:

2018-09-06 21:44:38,763 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.yarn.conf.dir, /etc/hadoop/conf
2018-09-06 21:44:38,764 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
2018-09-06 21:44:38,765 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: rest.port, 44477
2018-09-06 21:44:38,765 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 44477
2018-09-06 21:44:38,765 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.jobmanager.port, 44477
2018-09-06 21:44:38,775 INFO
org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
properties file under /tmp/.yarn-properties-hadoop.
2018-09-06 21:44:39,615 WARN  org.apache.hadoop.util.NativeCodeLoader
 - Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
2018-09-06 21:44:39,799 INFO
org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user
set to hadoop (auth:SIMPLE)
2018-09-06 21:44:40,045 INFO  org.apache.hadoop.yarn.client.RMProxy
 - Connecting to ResourceManager at
ip-10-2-3-71.ec2.internal/10.2.3.71:8032
2018-09-06 21:44:40,312 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster
specification: ClusterSpecification{masterMemoryMB=1024,
taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}
2018-09-06 21:44:43,564 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting
application master application_1536250520330_0007
2018-09-06 21:44:43,802 INFO
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
application application_1536250520330_0007
2018-09-06 21:44:43,802 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
the cluster to be allocated
2018-09-06 21:44:43,804 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
cluster, current state ACCEPTED
2018-09-06 21:44:48,326 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN
application has been deployed successfully.
2018-09-06 21:44:48,326 INFO
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink
YARN client has been started in detached mode. In order to stop Flink on
YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1536250520330_0007
Please also note that the temporary files of the YARN session in the home
directory will not be removed.
2018-09-06 21:44:48,821 INFO  org.apache.flink.runtime.rest.RestClient
- Rest client endpoint started.
Flink JobManager is now running on ip-10-2-3-25.ec2.internal:38683 with
leader id ----.
JobManager Web Interface: http://ip-10-2-3-25.ec2.internal:38683


I'm setting both the rest.port and jobmanager.web.port, but both are
ignored. Has anyone seen this before?

Thanks!


Re: REST: reading completed jobs' details

2018-09-06 Thread Miguel Coimbra
Exactly, that was the problem.
Didn't realize the restructured cluster channels all communications to the
REST port.

Thanks again.

Best,

On Thu, 6 Sep 2018 at 17:57, Chesnay Schepler  wrote:

> Did you by chance use the RemoteEnvironment and pass in 6123 as the port?
> If so, try using 8081 instead, which is the REST port.
>
> On 06.09.2018 18:24, Miguel Coimbra wrote:
>
> Hello Chesnay,
>
> Thanks for the information.
>
> Decided to move straight away to launching a standalone cluster.
> I'm now having another problem when trying to submit a job through my Java
> program after launching the standalone cluster.
>
> I configured the cluster (flink-1.6.0/conf/flink-conf.yaml) to use 2
> TaskManager instances and assigned port ranges for most Flink cluster
> entities (to avoid port collisions with more than 1 TaskManager):
>
> query.server.ports: 3-35000
> query.proxy.ports: 35001-4
> taskmanager.rpc.port: 45001-5
> taskmanager.data.port: 50001-55000
> blob.server.port: 55001-6
>
> I'm launching in Linux with:
>
> ./start-cluster.sh
>
> Starting cluster.
> Starting standalonesession daemon on host xxx.
> Starting taskexecutor daemon on host xxx.
> [INFO] 1 instance(s) of taskexecutor are already running on xxx.
> Starting taskexecutor daemon on host xxx.
>
>
> However, my Java program ends up hanging as soon as I perform an execute()
> call (for example by calling count() on a DataSet).
>
> Checking the JobManager log, I find the following exception whenever my
> Java program calls execute() over the ExecutionEnvironment (either using
> Maven on the terminal or from IntelliJ IDEA):
>
> WARN  akka.remote.transport.netty.NettyTransport-
> Remote connection to [/127.0.0.1:47774] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
> Adjusted frame length exceeds 10485760: 1347375960 - discarded
>
> I checked that the problem is happening on a count(), so I don't think it
> has to do with the JobManager/TaskManagers trying to exchange
> excessively-big messages.
>
> While searching, I tried to make sure my program compiles with the same
> library versions as those in this cluster version of Flink.
>
>
> I downloaded the Apache Flink 1.6 binaries to launch the cluster:
>
>
>
> https://www.apache.org/dyn/closer.lua/flink/flink-1.6.0/flink-1.6.0-bin-scala_2.11.tgz
>
>
> I then checked the library versions used in the pom.xml of the 1.6.0
> branch of the Flink repository:
>
>
> https://github.com/apache/flink/blob/release-1.6/pom.xml
>
> On my project's pom.xml, I have the following:
>
> 
>UTF-8
>UTF-8
>1.8
>1.8
>1.6.0   1.7.7
>1.2.17
>2.11.12
>2.11
>2.4.20
>4.12
>5.0.0
>${junit.version}.1
>1.0.1
>1.9.1
>
>
> My project's dependency versions match those of the Flink 1.6 repository
> (for libraries such as akka).
> However, I'm having difficulty understanding what else may be causing this
> problem.
>
> Thanks for your attention.
>
> Best,
>
> On Wed, 5 Sep 2018 at 20:18, Chesnay Schepler  wrote:
>
>> No, the cluster isn't shared. For each job a separate cluster is spun up
>> when calling execute(), at the end of which it is shut down.
>>
>> For explicitly creation and shutdown of a cluster I would suggest to
>> execute your jobs as a test that contains a MiniClusterResource.
>>
>> On 05.09.2018 20:59, Miguel Coimbra wrote:
>>
>> Thanks for the reply.
>>
>> However, I think my case differs because I am running a sequence of
>> independent Flink jobs on the same environment instance.
>> I only create the LocalExecutionEnvironment once.
>>
>> The web manager shows the job ID changing correctly every time a new job
>> is executed.
>>
>> Since it is the same execution environment (and therefore the same
>> cluster instance I imagine), those completed jobs should show as well, no?
>>
>> On Wed, 5 Sep 2018 at 18:40, Chesnay Schepler  wrote:
>>
>>> When you create an environment that way, then the cluster is shutdown
>>> once the job completes.
>>> The WebUI can _appear_ as still working since all the files, and data
>>> about the job, is cached in the browser.
>>>
>>> On 05.09.2018 17:39, Miguel Coimbra wrote:
>>>
>>> Hello,
>>>
>>> I'm having difficulty reading the status (such as time taken for each
>>> dataflow operator in a job) of jobs that have completed.
>>>
>>> First, when I click on "Completed jobs" on the web interface (by default
>>> at 8081), no job shows up.
>>> I see jobs that exist as "Running", but as soon as they finish, I would
>>> expect them to appear in the "Complete jobs" section, but no luck.
>>>
>>> Consider that I am running locally (web UI is running, I checked and it
>>> is available via browser) on 8081.
>>> None of these links worked for checking jobs that have already finished,
>>> such as the job ID 618fac9da6ea458f5091a9c40e54cbcc that had been running:
>>>
>>> http://127.0.0.1:8081/jobs/618fac9da6ea458f5091a9c40e54cbcc
>>> http://127.

After OutOfMemoryError State can not be readed

2018-09-06 Thread Edward Rojas
Hello all,

We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend. 
When performing some load testing we got an /OutOfMemoryError: native memory
exhausted/, causing the job to fail and be restarted.

After the Taskmanager is restarted, the job is recovered from a Checkpoint,
but it seems that there is a problem when trying to access the state. We got
the error from the *onTimer* function of a *onProcessingTime*.

It would be possible that the OOM error could have caused to checkpoint a
corrupted state?

We get Exceptions like:

TimerException{java.lang.RuntimeException: Error while retrieving data from
RocksDB.}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
at java.util.concurrent.FutureTask.run(FutureTask.java:277)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:811)
Caused by: java.lang.RuntimeException: Error while retrieving data from
RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
... 7 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:208)
at java.io.DataInputStream.readUTF(DataInputStream.java:618)
at java.io.DataInputStream.readUTF(DataInputStream.java:573)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
... 12 more


Thanks in advance for any help




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: REST: reading completed jobs' details

2018-09-06 Thread Chesnay Schepler
Did you by chance use the RemoteEnvironment and pass in 6123 as the 
port? If so, try using 8081 instead, which is the REST port.


On 06.09.2018 18:24, Miguel Coimbra wrote:

Hello Chesnay,

Thanks for the information.

Decided to move straight away to launching a standalone cluster.
I'm now having another problem when trying to submit a job through my 
Java program after launching the standalone cluster.


I configured the cluster (flink-1.6.0/conf/flink-conf.yaml) to use 2 
TaskManager instances and assigned port ranges for most Flink cluster 
entities (to avoid port collisions with more than 1 TaskManager):


query.server.ports: 3-35000
query.proxy.ports: 35001-4
taskmanager.rpc.port: 45001-5
taskmanager.data.port: 50001-55000
blob.server.port: 55001-6

I'm launching in Linux with:

./start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host xxx.
Starting taskexecutor daemon on host xxx.
[INFO] 1 instance(s) of taskexecutor are already running on xxx.
Starting taskexecutor daemon on host xxx.


However, my Java program ends up hanging as soon as I perform 
anexecute() call (for example by calling count() on a DataSet).


Checking the JobManager log, I find the following exception whenever 
my Java program calls execute() over the ExecutionEnvironment (either 
using Maven on the terminal or from IntelliJ IDEA):


WARN akka.remote.transport.netty.NettyTransport - Remote connection to 
[/127.0.0.1:47774 ] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: 
Adjusted frame length exceeds 10485760: 1347375960 - discarded


I checked that the problem is happening on a count(), so I don't think 
it has to do with the JobManager/TaskManagers trying to exchange 
excessively-big messages.


While searching, I tried to make sure my program compiles with the 
same library versions as those in this cluster version of Flink.



I downloaded the Apache Flink 1.6 binaries to launch the cluster:


https://www.apache.org/dyn/closer.lua/flink/flink-1.6.0/flink-1.6.0-bin-scala_2.11.tgz


I then checked the library versions used in the pom.xml of the 1.6.0 
branch of the Flink repository:



https://github.com/apache/flink/blob/release-1.6/pom.xml

On my project's pom.xml, I have the following:


UTF-8
UTF-8
1.8
1.8
1.6.01.7.7
1.2.17
2.11.12
2.11
2.4.20
4.12
5.0.0
${junit.version}.1
1.0.1
1.9.1


My project's dependency versions match those of the Flink 1.6 
repository (for libraries such as akka).
However, I'm having difficulty understanding what else may be causing 
this problem.


Thanks for your attention.

Best,

On Wed, 5 Sep 2018 at 20:18, Chesnay Schepler > wrote:


No, the cluster isn't shared. For each job a separate cluster is
spun up when calling execute(), at the end of which it is shut down.

For explicitly creation and shutdown of a cluster I would suggest
to execute your jobs as a test that contains a MiniClusterResource.

On 05.09.2018 20:59, Miguel Coimbra wrote:

Thanks for the reply.

However, I think my case differs because I am running a sequence
of independent Flink jobs on the same environment instance.
I only create the LocalExecutionEnvironment once.

The web manager shows the job ID changing correctly every time a
new job is executed.

Since it is the same execution environment (and therefore the
same cluster instance I imagine), those completed jobs should
show as well, no?

On Wed, 5 Sep 2018 at 18:40, Chesnay Schepler mailto:ches...@apache.org>> wrote:

When you create an environment that way, then the cluster is
shutdown once the job completes.
The WebUI can _appear_ as still working since all the files,
and data about the job, is cached in the browser.

On 05.09.2018 17:39, Miguel Coimbra wrote:

Hello,

I'm having difficulty reading the status (such as time taken
for each dataflow operator in a job) of jobs that have
completed.

First, when I click on "Completed jobs" on the web interface
(by default at 8081), no job shows up.
I see jobs that exist as "Running", but as soon as they
finish, I would expect them to appear in the "Complete jobs"
section, but no luck.

Consider that I am running locally (web UI is running, I
checked and it is available via browser) on 8081.
None of these links worked for checking jobs that have
already finished, such as the job ID
618fac9da6ea458f5091a9c40e54cbcc that had been running:

http://127.0.0.1:8081/jobs/618fac9da6ea458f5091a9c40e54cbcc
http://127.0.0.1:8081/completed-jobs/618fac9da6ea458f5091a9c40e54cbcc

I'm running with a LocalExecutionEnvironment with with the
method:
ExecutionEnvironment.createLoc

Re: REST: reading completed jobs' details

2018-09-06 Thread Miguel Coimbra
Hello Chesnay,

Thanks for the information.

Decided to move straight away to launching a standalone cluster.
I'm now having another problem when trying to submit a job through my Java
program after launching the standalone cluster.

I configured the cluster (flink-1.6.0/conf/flink-conf.yaml) to use 2
TaskManager instances and assigned port ranges for most Flink cluster
entities (to avoid port collisions with more than 1 TaskManager):

query.server.ports: 3-35000
query.proxy.ports: 35001-4
taskmanager.rpc.port: 45001-5
taskmanager.data.port: 50001-55000
blob.server.port: 55001-6

I'm launching in Linux with:

./start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host xxx.
Starting taskexecutor daemon on host xxx.
[INFO] 1 instance(s) of taskexecutor are already running on xxx.
Starting taskexecutor daemon on host xxx.


However, my Java program ends up hanging as soon as I perform an execute()
call (for example by calling count() on a DataSet).

Checking the JobManager log, I find the following exception whenever my
Java program calls execute() over the ExecutionEnvironment (either using
Maven on the terminal or from IntelliJ IDEA):

WARN  akka.remote.transport.netty.NettyTransport-
Remote connection to [/127.0.0.1:47774] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 1347375960 - discarded

I checked that the problem is happening on a count(), so I don't think it
has to do with the JobManager/TaskManagers trying to exchange
excessively-big messages.

While searching, I tried to make sure my program compiles with the same
library versions as those in this cluster version of Flink.


I downloaded the Apache Flink 1.6 binaries to launch the cluster:


https://www.apache.org/dyn/closer.lua/flink/flink-1.6.0/flink-1.6.0-bin-scala_2.11.tgz


I then checked the library versions used in the pom.xml of the 1.6.0 branch
of the Flink repository:


https://github.com/apache/flink/blob/release-1.6/pom.xml

On my project's pom.xml, I have the following:


   UTF-8
   UTF-8
   1.8
   1.8
   1.6.0
   1.7.7
   1.2.17
   2.11.12
   2.11
   2.4.20
   4.12
   5.0.0
   ${junit.version}.1
   1.0.1
   1.9.1



My project's dependency versions match those of the Flink 1.6 repository
(for libraries such as akka).
However, I'm having difficulty understanding what else may be causing this
problem.

Thanks for your attention.

Best,

On Wed, 5 Sep 2018 at 20:18, Chesnay Schepler  wrote:

> No, the cluster isn't shared. For each job a separate cluster is spun up
> when calling execute(), at the end of which it is shut down.
>
> For explicitly creation and shutdown of a cluster I would suggest to
> execute your jobs as a test that contains a MiniClusterResource.
>
> On 05.09.2018 20:59, Miguel Coimbra wrote:
>
> Thanks for the reply.
>
> However, I think my case differs because I am running a sequence of
> independent Flink jobs on the same environment instance.
> I only create the LocalExecutionEnvironment once.
>
> The web manager shows the job ID changing correctly every time a new job
> is executed.
>
> Since it is the same execution environment (and therefore the same cluster
> instance I imagine), those completed jobs should show as well, no?
>
> On Wed, 5 Sep 2018 at 18:40, Chesnay Schepler  wrote:
>
>> When you create an environment that way, then the cluster is shutdown
>> once the job completes.
>> The WebUI can _appear_ as still working since all the files, and data
>> about the job, is cached in the browser.
>>
>> On 05.09.2018 17:39, Miguel Coimbra wrote:
>>
>> Hello,
>>
>> I'm having difficulty reading the status (such as time taken for each
>> dataflow operator in a job) of jobs that have completed.
>>
>> First, when I click on "Completed jobs" on the web interface (by default
>> at 8081), no job shows up.
>> I see jobs that exist as "Running", but as soon as they finish, I would
>> expect them to appear in the "Complete jobs" section, but no luck.
>>
>> Consider that I am running locally (web UI is running, I checked and it
>> is available via browser) on 8081.
>> None of these links worked for checking jobs that have already finished,
>> such as the job ID 618fac9da6ea458f5091a9c40e54cbcc that had been running:
>>
>> http://127.0.0.1:8081/jobs/618fac9da6ea458f5091a9c40e54cbcc
>> http://127.0.0.1:8081/completed-jobs/618fac9da6ea458f5091a9c40e54cbcc
>>
>> I'm running with a LocalExecutionEnvironment with with the method:
>>
>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>>
>> I hope anyone may be able to help.
>>
>> Best,
>>
>>
>>
>>
>


Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

2018-09-06 Thread Till Rohrmann
Hi Subramanya,

if the container is still running and the TM can simply not connect to the
JobManager, then the ResourceManager does not see a problem. The RM things
in terms of containers and as long as n containers are running, it won't
start new ones. That's the reason why the TM should exit in order to
terminate the container.

Have you tried using a newer Flink version? Lately we have reworked a good
part of Flink's distributed architecture and added resource elasticity
(starting with Flink 1.5).

Cheers,
Till

On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh 
wrote:

> Hi,
> With some additional research,
>
> *Before the flag*
> I realized for failed containers (that exited for a specific  we still
> were Requesting new TM container and launching TM). But for the "Detected
> unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123]" issue I do not see
> the container marked as failed and a subsequent request for TM.
>
> *After taskmanager.exit-on-fatal-akka-error: true` *
> I do not see any unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123]
> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
> sure if that is a coincidence or a direct impact of this change.
>
> *Our Issue:*
> I realized we are still exiting the application, i.e. failing when the
> containers are lost. The reason for this is before
> org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new
> container TM, launch TM and it is reported as started, the 
> org.apache.flink.runtime.jobmanager.scheduler
> throws a NoResourceAvailableException that causes a failure. In our case we
> had fixed restart strategy with 5, and we are running out of it because of
> this. I am looking to solve this with a FailureRateRestartStrategy over 2
> minutes interval (10 second restart delay, >12 failures), that lets the TM
> come back (takes about 50 seconds).
>
> *Flink Bug*
> But I cannot help but think why there is no interaction between the
> ResourceManager and JobManager, i.e. why is the jobmanager continuing with
> the processing despite not having the required TMs ?
>
> Logs to substantiate what I said above (only relevant).
>
> 018-09-03 06:17:13,932 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager- Container
> container_e28_1535135887442_1381_01_97 completed successfully with
> diagnostics: Container released by application
>
>
> 2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
>   - Association with remote system [akka.tcp://
> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> 2018-09-03 06:34:19,214 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager- Container
> container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
> 2018-09-03 06:34:19,215 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager- Diagnostics
> for container container_e27_1535135887442_1381_01_000102 in state COMPLETE
> : exitStatus=-102 diagnostics=Container preempted by scheduler
> 2018-09-03 06:34:19,215 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager- Total
> number of failed containers so far: 1
> 2018-09-03 06:34:19,215 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager- Requesting
> new TaskManager container with 2 megabytes memory. Pending requests: 1
> 2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
>   - Task manager akka.tcp://
> fl...@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
> 2018-09-03 06:34:19,218 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
> state RUNNING to FAILING.
> java.lang.Exception: TaskManager was lost/killed:
> container_e27_1535135887442_1381_01_000102 @
> hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
> 2018-09-03 06:34:19,466 INFO
> org.apache.flink.runtime.instance.InstanceManager -
> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
> Number of registered task managers 144. Number of available slots 720
>
> 2018-09-03 06:34:24,717 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager- Received
> new container: container_e28_1535135887442_1381_01_000147 - Remaining
> pending container requests: 0
> 2018-09-03 06:34:24,717 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager- Launching
> TaskManager in container ContainerInLaunch @ 1535956464717: Container:
> [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId:
> hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
> hello-world9-27-crz.ops.sfdc.net:8042, Resource:  vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
> 2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
>   - Associat

Cannot set classpath which can be used before job is submitted to yarn.

2018-09-06 Thread bupt_ljy
Hi,all
 I’m using “bin/flink run -m yarn-cluster” to run my program on yarn. However, 
it seems that I can’t add my own files into classpath before the the job is 
submitted to yarn. For example, I have a conf file, which located in my own 
conf directory, and I need to load file from the conf directory to initialize 
some classes instances before it’s submitted to yarn.
 I’m grateful if anyone can give me some help.
 Thanks.

Re: Ask about running multiple jars for different stream jobs

2018-09-06 Thread Rad Rad
Thanks very much. Now it works fine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Ask about running multiple jars for different stream jobs

2018-09-06 Thread Rad Rad
Thanks very much. Now it works fine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink failure recovery tooks very long time

2018-09-06 Thread Yun Tang
Hi Kien

You could try to kill one TM container by using 'yarn container -signal 
 FORCEFUL_SHUTDOWN' command, and then watch the first checkpoint 
after job failover. You could view the checkpoint details[1] to see whether 
exists outlier operator or sub-task which consumed extremely long time to 
finish the checkpoint, and then you could view the task manager log of that 
sub-task to see anything weird. If all the sync and async duration is not long 
but the end-to-end duration is too long, it might because the checkpoint 
alignment time is too long which affects the overall end-to-end duration.

Best
Yun Tang

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/checkpoint_monitoring.html#checkpoint-details

From: trung kien 
Sent: Thursday, September 6, 2018 22:31
To: vino yang
Cc: myas...@live.com; user
Subject: Re: Flink failure recovery tooks very long time

And here is the snapshot of my checkpoint metrics in normal condition.

[cid:165af4af3b2207cf5f01]
[cid:165af4b16d3207cf5f02]

On Thu, Sep 6, 2018 at 9:21 AM trung kien 
mailto:kient...@gmail.com>> wrote:
Hi Yun,

Yes, the job’s status change to Running pretty fast after failure (~ 1 min).

As soon as the status change to running, first checkpoint is kick off and it 
took 30 mins. I need to have exactly-one as i maintining some aggregation 
metric, do you know whats the diffrent between first checkpoint and checkpoints 
after that? (it’s fairely quick after that)

Here is size of my checkpoints ( i config to keep 5 latest checkpoints)
449Mchk-1626
775Mchk-1627
486Mchk-1628
7.8Gchk-1629
7.5Gchk-1630

I dont know why the size is too diffrent.
Metrics on checkpoints looks good as besides the spike in the first checkpoint, 
everything looks fine.

@Vino: Yes, i can try to switch to DEBuG to see if i got any information.


On Thu, Sep 6, 2018 at 7:09 AM vino yang 
mailto:yanghua1...@gmail.com>> wrote:
Hi trung,

Can you provide more information to aid in positioning? For example, the size 
of the state generated by a checkpoint and more log information, you can try to 
switch the log level to DEBUG.

Thanks, vino.

Yun Tang mailto:myas...@live.com>> 于2018年9月6日周四 下午7:42写道:
Hi Kien

>From your description, your job has already started to execute checkpoint 
>after job failover, which means your job was in RUNNING status. From my point 
>of view, the actual recovery time should be the time during job's status: 
>RESTARTING->CREATED->RUNNING[1].
Your trouble sounds more like the long time needed for the first checkpoint to 
complete after job failover. Afaik, It's probably because your job is heavily 
back pressured after the failover and the checkpoint mode is exactly-once, some 
operators need to receive all the input checkpoint barrier to trigger the 
checkpoint. You can watch your metrics of checkpoint alignment time to verify 
the root cause, and if you do not need the exactly once guarantees, you can 
change the checkpoint mode to at-least-once[2].

Best
Yun Tang


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html#jobmanager-data-structures
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/fig/stream_barriers.svg]

Apache Flink 1.6 Documentation: Data Streaming Fault 
Tolerance
Apache Flink offers a fault tolerance mechanism to consistently recover the 
state of data streaming applications. The mechanism ensures that even in the 
presence of failures, the program’s state will eventually reflect every record 
from the data stream exactly once. Note that there is a switch to ...
ci.apache.org



From: trung kien mailto:kient...@gmail.com>>
Sent: Thursday, September 6, 2018 18:50
To: user@flink.apache.org
Subject: Flink failure recovery tooks very long time

Hi all,

I am trying to test failure recovery of a Flink job when a JM or TM goes down.
Our target is having job auto restart and back to normal condition in any case.

However, what's I am seeing is very strange and hope someone here help me to 
understand it.

When JM or TM went down, I see the job was being restarted but as soon as it 
restart it's working on checkingpoint and usually took 30+ minutes to finish 
(usually in normal case, it only take 1-2 mins for checkpoint), As soon as the 
checkpoint is finish, the job is back to normal condition.

I'm using 1.4.2, but seeing similar thing on 1.6.0 as well.

Could anyone please help to explain this behavior? We really want to reduce the 
time of recovery but doesn't seem to 

Re: Flink failure recovery tooks very long time

2018-09-06 Thread trung kien
And here is the snapshot of my checkpoint metrics in normal condition.


On Thu, Sep 6, 2018 at 9:21 AM trung kien  wrote:

> Hi Yun,
>
> Yes, the job’s status change to Running pretty fast after failure (~ 1
> min).
>
> As soon as the status change to running, first checkpoint is kick off and
> it took 30 mins. I need to have exactly-one as i maintining some
> aggregation metric, do you know whats the diffrent between first checkpoint
> and checkpoints after that? (it’s fairely quick after that)
>
> Here is size of my checkpoints ( i config to keep 5 latest checkpoints)
> 449Mchk-1626
> 775Mchk-1627
> 486Mchk-1628
> 7.8Gchk-1629
> 7.5Gchk-1630
>
> I dont know why the size is too diffrent.
> Metrics on checkpoints looks good as besides the spike in the first
> checkpoint, everything looks fine.
>
> @Vino: Yes, i can try to switch to DEBuG to see if i got any information.
>
>
> On Thu, Sep 6, 2018 at 7:09 AM vino yang  wrote:
>
>> Hi trung,
>>
>> Can you provide more information to aid in positioning? For example, the
>> size of the state generated by a checkpoint and more log information, you
>> can try to switch the log level to DEBUG.
>>
>> Thanks, vino.
>>
>> Yun Tang  于2018年9月6日周四 下午7:42写道:
>>
>>> Hi Kien
>>>
>>> From your description, your job has already started to execute
>>> checkpoint after job failover, which means your job was in RUNNING status.
>>> From my point of view, the actual recovery time should be the time during
>>> job's status: RESTARTING->CREATED->RUNNING[1].
>>> Your trouble sounds more like the long time needed for the first
>>> checkpoint to complete after job failover. Afaik, It's probably because
>>> your job is heavily back pressured after the failover and the checkpoint
>>> mode is exactly-once, some operators need to receive all the input
>>> checkpoint barrier to trigger the checkpoint. You can watch your metrics of
>>> checkpoint alignment time to verify the root cause, and if you do not need
>>> the exactly once guarantees, you can change the checkpoint mode to
>>> at-least-once[2].
>>>
>>> Best
>>> Yun Tang
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html#jobmanager-data-structures
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
>>>
>>> 
>>> Apache Flink 1.6 Documentation: Data Streaming Fault Tolerance
>>> 
>>> Apache Flink offers a fault tolerance mechanism to consistently recover
>>> the state of data streaming applications. The mechanism ensures that even
>>> in the presence of failures, the program’s state will eventually reflect
>>> every record from the data stream exactly once. Note that there is a switch
>>> to ...
>>> ci.apache.org
>>>
>>> --
>>> *From:* trung kien 
>>> *Sent:* Thursday, September 6, 2018 18:50
>>> *To:* user@flink.apache.org
>>> *Subject:* Flink failure recovery tooks very long time
>>>
>>> Hi all,
>>>
>>> I am trying to test failure recovery of a Flink job when a JM or TM goes
>>> down.
>>> Our target is having job auto restart and back to normal condition in
>>> any case.
>>>
>>> However, what's I am seeing is very strange and hope someone here help
>>> me to understand it.
>>>
>>> When JM or TM went down, I see the job was being restarted but as soon
>>> as it restart it's working on checkingpoint and usually took 30+ minutes to
>>> finish (usually in normal case, it only take 1-2 mins for checkpoint), As
>>> soon as the checkpoint is finish, the job is back to normal condition.
>>>
>>> I'm using 1.4.2, but seeing similar thing on 1.6.0 as well.
>>>
>>> Could anyone please help to explain this behavior? We really want to
>>> reduce the time of recovery but doesn't seem to find any document mentioned
>>> about recovery process in detail.
>>>
>>> Any help is really appreciate.
>>>
>>>
>>> --
>>> Thanks
>>> Kien
>>> --
>>> Thanks
>>> Kien
>>>
>> --
> Thanks
> Kien
>
-- 
Thanks
Kien


Re: Flink failure recovery tooks very long time

2018-09-06 Thread trung kien
Hi Yun,

Yes, the job’s status change to Running pretty fast after failure (~ 1 min).

As soon as the status change to running, first checkpoint is kick off and
it took 30 mins. I need to have exactly-one as i maintining some
aggregation metric, do you know whats the diffrent between first checkpoint
and checkpoints after that? (it’s fairely quick after that)

Here is size of my checkpoints ( i config to keep 5 latest checkpoints)
449Mchk-1626
775Mchk-1627
486Mchk-1628
7.8Gchk-1629
7.5Gchk-1630

I dont know why the size is too diffrent.
Metrics on checkpoints looks good as besides the spike in the first
checkpoint, everything looks fine.

@Vino: Yes, i can try to switch to DEBuG to see if i got any information.


On Thu, Sep 6, 2018 at 7:09 AM vino yang  wrote:

> Hi trung,
>
> Can you provide more information to aid in positioning? For example, the
> size of the state generated by a checkpoint and more log information, you
> can try to switch the log level to DEBUG.
>
> Thanks, vino.
>
> Yun Tang  于2018年9月6日周四 下午7:42写道:
>
>> Hi Kien
>>
>> From your description, your job has already started to execute checkpoint
>> after job failover, which means your job was in RUNNING status. From my
>> point of view, the actual recovery time should be the time during job's
>> status: RESTARTING->CREATED->RUNNING[1].
>> Your trouble sounds more like the long time needed for the first
>> checkpoint to complete after job failover. Afaik, It's probably because
>> your job is heavily back pressured after the failover and the checkpoint
>> mode is exactly-once, some operators need to receive all the input
>> checkpoint barrier to trigger the checkpoint. You can watch your metrics of
>> checkpoint alignment time to verify the root cause, and if you do not need
>> the exactly once guarantees, you can change the checkpoint mode to
>> at-least-once[2].
>>
>> Best
>> Yun Tang
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html#jobmanager-data-structures
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
>>
>> 
>> Apache Flink 1.6 Documentation: Data Streaming Fault Tolerance
>> 
>> Apache Flink offers a fault tolerance mechanism to consistently recover
>> the state of data streaming applications. The mechanism ensures that even
>> in the presence of failures, the program’s state will eventually reflect
>> every record from the data stream exactly once. Note that there is a switch
>> to ...
>> ci.apache.org
>>
>> --
>> *From:* trung kien 
>> *Sent:* Thursday, September 6, 2018 18:50
>> *To:* user@flink.apache.org
>> *Subject:* Flink failure recovery tooks very long time
>>
>> Hi all,
>>
>> I am trying to test failure recovery of a Flink job when a JM or TM goes
>> down.
>> Our target is having job auto restart and back to normal condition in any
>> case.
>>
>> However, what's I am seeing is very strange and hope someone here help me
>> to understand it.
>>
>> When JM or TM went down, I see the job was being restarted but as soon as
>> it restart it's working on checkingpoint and usually took 30+ minutes to
>> finish (usually in normal case, it only take 1-2 mins for checkpoint), As
>> soon as the checkpoint is finish, the job is back to normal condition.
>>
>> I'm using 1.4.2, but seeing similar thing on 1.6.0 as well.
>>
>> Could anyone please help to explain this behavior? We really want to
>> reduce the time of recovery but doesn't seem to find any document mentioned
>> about recovery process in detail.
>>
>> Any help is really appreciate.
>>
>>
>> --
>> Thanks
>> Kien
>> --
>> Thanks
>> Kien
>>
> --
Thanks
Kien


Behaviour of Process Window Function

2018-09-06 Thread Harshvardhan Agrawal
Hello,

We have a Flink pipeline where we are windowing our data after a keyBy. i.e.
myStream.keyBy().window().process(MyIncrementalAggregation(),
MyProcessFunction()).

I have two questions about the above line of code:
1) Does the state in the process window function qualify as KeyedState or
OperatorState? If operator state, can we access KeyedState from the Process
Window function?
2) We also have certain reference information that we want to share across
all keys in the process window function. We are currently storing all that
info in a Guava cache. We want to be able to rehydrate the guava cache at
the beginning of each window by making an external rest call and clear the
cache at the end of that respective window. How can we enforce this
behaviour in Flink? Do I need to use a timerservice for this where the
callback will be a window.maxtimestamp() or just clearing the cache in the
clear method will do the trick?

-- 

*Regards,Harshvardhan Agrawal*


Re: Missing Calcite SQL functions in table API

2018-09-06 Thread françois lacombe
Thanks Fabian,

I didn't notice select() wasn't SQL compliant.
sqlQuery works fine, it's all right :)


All the best

François

2018-09-05 12:30 GMT+02:00 Fabian Hueske :

> Hi
>
> You are using SQL syntax in a Table API query. You have to stick to Table
> API syntax or use SQL as
>
> tEnv.sqlQuery("SELECT col1,CONCAT('field1:',col2,',field2:',CAST(col3 AS
> string)) FROM csvTable")
>
> The Flink documentation lists all supported functions for Table API [1]
> and SQL [2].
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
> tableApi.html
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/sql.html
>
> 2018-09-05 12:22 GMT+02:00 françois lacombe 
> :
>
>> Hi all,
>>
>> I'm trying to use CONVERT or CAST functions from Calcite docs to query
>> some table with Table API.
>> https://calcite.apache.org/docs/reference.html
>>
>> csv_table.select("col1,CONCAT('field1:',col2,',field2:',CAST(col3 AS
>> string))");
>> col3 is actually described as int the CSV schema and CONCAT doesn't like
>> it.
>>
>> An exception is thrown "Undefined function: CAST"
>>
>> The docs mention that SQL implementation is based on Calcite and is there
>> a list of available functions please?
>> May I skip some useful dependency?
>>
>>
>> Thanks in advance
>>
>> François
>>
>>
>


Re: Flink failure recovery tooks very long time

2018-09-06 Thread vino yang
Hi trung,

Can you provide more information to aid in positioning? For example, the
size of the state generated by a checkpoint and more log information, you
can try to switch the log level to DEBUG.

Thanks, vino.

Yun Tang  于2018年9月6日周四 下午7:42写道:

> Hi Kien
>
> From your description, your job has already started to execute checkpoint
> after job failover, which means your job was in RUNNING status. From my
> point of view, the actual recovery time should be the time during job's
> status: RESTARTING->CREATED->RUNNING[1].
> Your trouble sounds more like the long time needed for the first
> checkpoint to complete after job failover. Afaik, It's probably because
> your job is heavily back pressured after the failover and the checkpoint
> mode is exactly-once, some operators need to receive all the input
> checkpoint barrier to trigger the checkpoint. You can watch your metrics of
> checkpoint alignment time to verify the root cause, and if you do not need
> the exactly once guarantees, you can change the checkpoint mode to
> at-least-once[2].
>
> Best
> Yun Tang
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html#jobmanager-data-structures
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
>
> 
> Apache Flink 1.6 Documentation: Data Streaming Fault Tolerance
> 
> Apache Flink offers a fault tolerance mechanism to consistently recover
> the state of data streaming applications. The mechanism ensures that even
> in the presence of failures, the program’s state will eventually reflect
> every record from the data stream exactly once. Note that there is a switch
> to ...
> ci.apache.org
>
> --
> *From:* trung kien 
> *Sent:* Thursday, September 6, 2018 18:50
> *To:* user@flink.apache.org
> *Subject:* Flink failure recovery tooks very long time
>
> Hi all,
>
> I am trying to test failure recovery of a Flink job when a JM or TM goes
> down.
> Our target is having job auto restart and back to normal condition in any
> case.
>
> However, what's I am seeing is very strange and hope someone here help me
> to understand it.
>
> When JM or TM went down, I see the job was being restarted but as soon as
> it restart it's working on checkingpoint and usually took 30+ minutes to
> finish (usually in normal case, it only take 1-2 mins for checkpoint), As
> soon as the checkpoint is finish, the job is back to normal condition.
>
> I'm using 1.4.2, but seeing similar thing on 1.6.0 as well.
>
> Could anyone please help to explain this behavior? We really want to
> reduce the time of recovery but doesn't seem to find any document mentioned
> about recovery process in detail.
>
> Any help is really appreciate.
>
>
> --
> Thanks
> Kien
> --
> Thanks
> Kien
>


Re: Flink failure recovery tooks very long time

2018-09-06 Thread Yun Tang
Hi Kien

>From your description, your job has already started to execute checkpoint 
>after job failover, which means your job was in RUNNING status. From my point 
>of view, the actual recovery time should be the time during job's status: 
>RESTARTING->CREATED->RUNNING[1].
Your trouble sounds more like the long time needed for the first checkpoint to 
complete after job failover. Afaik, It's probably because your job is heavily 
back pressured after the failover and the checkpoint mode is exactly-once, some 
operators need to receive all the input checkpoint barrier to trigger the 
checkpoint. You can watch your metrics of checkpoint alignment time to verify 
the root cause, and if you do not need the exactly once guarantees, you can 
change the checkpoint mode to at-least-once[2].

Best
Yun Tang


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html#jobmanager-data-structures
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/fig/stream_barriers.svg]

Apache Flink 1.6 Documentation: Data Streaming Fault 
Tolerance
Apache Flink offers a fault tolerance mechanism to consistently recover the 
state of data streaming applications. The mechanism ensures that even in the 
presence of failures, the program’s state will eventually reflect every record 
from the data stream exactly once. Note that there is a switch to ...
ci.apache.org



From: trung kien 
Sent: Thursday, September 6, 2018 18:50
To: user@flink.apache.org
Subject: Flink failure recovery tooks very long time

Hi all,

I am trying to test failure recovery of a Flink job when a JM or TM goes down.
Our target is having job auto restart and back to normal condition in any case.

However, what's I am seeing is very strange and hope someone here help me to 
understand it.

When JM or TM went down, I see the job was being restarted but as soon as it 
restart it's working on checkingpoint and usually took 30+ minutes to finish 
(usually in normal case, it only take 1-2 mins for checkpoint), As soon as the 
checkpoint is finish, the job is back to normal condition.

I'm using 1.4.2, but seeing similar thing on 1.6.0 as well.

Could anyone please help to explain this behavior? We really want to reduce the 
time of recovery but doesn't seem to find any document mentioned about recovery 
process in detail.

Any help is really appreciate.


--
Thanks
Kien
--
Thanks
Kien


Flink failure recovery tooks very long time

2018-09-06 Thread trung kien
Hi all,

I am trying to test failure recovery of a Flink job when a JM or TM goes
down.
Our target is having job auto restart and back to normal condition in any
case.

However, what's I am seeing is very strange and hope someone here help me
to understand it.

When JM or TM went down, I see the job was being restarted but as soon as
it restart it's working on checkingpoint and usually took 30+ minutes to
finish (usually in normal case, it only take 1-2 mins for checkpoint), As
soon as the checkpoint is finish, the job is back to normal condition.

I'm using 1.4.2, but seeing similar thing on 1.6.0 as well.

Could anyone please help to explain this behavior? We really want to reduce
the time of recovery but doesn't seem to find any document mentioned about
recovery process in detail.

Any help is really appreciate.


-- 
Thanks
Kien
-- 
Thanks
Kien


Re: RocksDB Number of Keys Metric

2018-09-06 Thread Andrey Zagrebin
Hi,

afaik, the option is not exposed according to the current state of source code.
I can see it to be useful and technically possible using:
db.getLongProperty(stateColumnFamilyHandle, "rocksdb.estimate-num-keys”);

Though couple of things come into my mind to take into account for this feature:

- we can expose it per state object for heap and rocksdb, e.g. as 
State.estimatekeyNum() 
  but it will report it only for key groups in the operator subtask where it is 
called.

- another way might be to expose it in Web UI where we can even sum it up for 
all operator subtasks.

- for all state types currently, "rocksdb.estimate-num-keys” would show total 
number of all state keys but
  for map state this particular metric would show total number of user map keys 
for all state
  keys, so it might be misleading and technically not so easy to align its 
semantics for map with other state types.

We can discuss here in mailing list, how many users would be interested in this 
feature and
then a JIRA ticket can be created to track it, if community decides to 
contribute this feature.

Cheers,
Andrey

> On 4 Sep 2018, at 05:49, vino yang  wrote:
> 
> Hi Ahmed,
> 
> If you feel that this metric is necessary, you can create an issue in JIRA, 
> then the problem may be more easily seen by the relevant people. 
> If you need to answer this question, maybe it is more effective to ping 
> @Andrey?
> 
> Thanks, vino.
> 
> Ahmed mailto:aalobaidi...@gmail.com>> 于2018年9月2日周日 
> 上午2:31写道:
> Is there a clean way of exposing a metrics regarding the number of keys (even 
> if it is an estimate using 'rocksdb.estimate-num-keys') in a rocksdb state 
> store? RocksDBValueState is not public to users code.
> 
> Best,
> Ahmed
> 



Re: Increased Size of Incremental Checkpoint

2018-09-06 Thread Stefan Richter
Hi,

you should expect that the size can vary for some checkpoints, even if the 
change rate is constant. Some checkpoints will upload compacted replacements 
for previous checkpoints to prevent that the checkpoint history will grow 
without bounds. Whenever that
happens, the checkpoint does some „extra work“ by re-uploading compacted/merged 
versions of previous deltas.

Best,
Stefan 

> Am 05.09.2018 um 22:29 schrieb burgesschen :
> 
> Hi guys,
> I enabled incremental flink checkpoint for my flink job. I had the job read
> messages at a stable rate. For each message, the flink job store something
> in the keyed state. My question is: For every minute, the increased state
> size is the same, shouldn't the incremental checkpoint size remain
> relatively constant also? How come it is increasing as shown in the picture?
> Thank you!
> 
> 
> 
>  
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/