Adaptive Watermarks Generator

2020-05-08 Thread 杨东晓
Hi , I noticed there is a paper describe about adaptive watermark generator
on top of Apache Flink v1.6.2 :
https://www.dfki.de/fileadmin/user_upload/import/10260_awad-adaptive-watermarks.pdf

This looks like a more precise generator with much less data drop . Does
anybody know more detail about this or does Flink community have any plan
about this?

Thanks!


Re: Cannot start native K8s

2020-05-08 Thread Dongwon Kim
Hi Yang,

Oops, I forget to copy /etc/kube/admin.conf to $HOME/.kube/config so that
the current user account can access to K8s.
Now that I copied it, I found that kubernetes-session.sh is working fine.
Thanks very much!

Best,
Dongwon

[flink@DAC-E04-W06 ~]$ kubernetes-session.sh
2020-05-09 12:43:49,961 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, DAC-E04-W06
2020-05-09 12:43:49,962 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2020-05-09 12:43:49,962 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2020-05-09 12:43:49,962 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.process.size, 24g
2020-05-09 12:43:49,963 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 24
2020-05-09 12:43:49,963 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2020-05-09 12:43:49,963 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability, zookeeper
2020-05-09 12:43:49,963 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.path.root, /flink
2020-05-09 12:43:49,964 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.storageDir, hdfs:///user/flink/ha/
2020-05-09 12:43:49,964 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.quorum, DAC-E04-W06:2181
2020-05-09 12:43:49,965 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-05-09 12:43:49,965 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: rest.port, 8082
2020-05-09 12:43:51,122 INFO
 org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The
derived from fraction jvm overhead memory (2.400gb (2576980416 bytes)) is
greater than its max value 1024.000mb (1073741824 bytes), max value will be
used instead
2020-05-09 12:43:51,123 INFO
 org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The
derived from fraction network memory (2.291gb (2459539902 bytes)) is
greater than its max value 1024.000mb (1073741824 bytes), max value will be
used instead
2020-05-09 12:43:51,131 INFO
 org.apache.flink.kubernetes.utils.KubernetesUtils - Kubernetes
deployment requires a fixed port. Configuration blob.server.port will be
set to 6124
2020-05-09 12:43:51,131 INFO
 org.apache.flink.kubernetes.utils.KubernetesUtils - Kubernetes
deployment requires a fixed port. Configuration taskmanager.rpc.port will
be set to 6122
2020-05-09 12:43:51,134 INFO
 org.apache.flink.kubernetes.utils.KubernetesUtils - Kubernetes
deployment requires a fixed port. Configuration
high-availability.jobmanager.port will be set to 6123
2020-05-09 12:43:52,167 INFO
 org.apache.flink.kubernetes.KubernetesClusterDescriptor   - Create
flink session cluster flink-cluster-4a82d41b-af15-4205-8a44-62351e270242
successfully, JobManager Web Interface: http://cluster-endpoint:31513


On Sat, May 9, 2020 at 12:29 PM Yang Wang  wrote:

> Hi Dongwon Kim,
>
> Thanks a lot for your information. I will dig into this issue.
>
> I think the "UnknownHostException" is caused by incorrectly setting the
> Kubernetes
> ApiServer address. Maybe you are using "kubernetes.default.svc". However,
> it
> could not be accessed outside of the Kubernetes cluster. You need to
> configure
> a correct ip/hostname for ApiServer address, which could be accessed in
> your
> local environment. You could use `kubectl auth can-i create pods` to verify
> whether the kube config is correct.
>
> BTW, currently we only find the flink on native K8s could not work on
> 8u252. For
> 8u242 and lower version, it works well.
>
>
> Best,
> Yang
>
> Dongwon Kim  于2020年5月9日周六 上午10:43写道:
>
>> Hello Yang,
>>
>> I'm using K8s v1.18.2 installed by Kubeadm over a cluster of 5 nodes (not
>> a Minikube).
>> Previously, as you pointed out, openjdk version "1.8.0_252" was installed.
>> I bump up java version to openjdk 11.0.7 but got something different:
>>
>> [flink@DAC-E04-W06 bin]$ ./kubernetes-session.sh
>> 2020-05-09 11:39:36,737 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.address, DAC-E04-W06
>> 2020-05-09 11:39:36,739 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.r

Re: Cannot start native K8s

2020-05-08 Thread Yang Wang
Hi Dongwon Kim,

Thanks a lot for your information. I will dig into this issue.

I think the "UnknownHostException" is caused by incorrectly setting the
Kubernetes
ApiServer address. Maybe you are using "kubernetes.default.svc". However, it
could not be accessed outside of the Kubernetes cluster. You need to
configure
a correct ip/hostname for ApiServer address, which could be accessed in your
local environment. You could use `kubectl auth can-i create pods` to verify
whether the kube config is correct.

BTW, currently we only find the flink on native K8s could not work on
8u252. For
8u242 and lower version, it works well.


Best,
Yang

Dongwon Kim  于2020年5月9日周六 上午10:43写道:

> Hello Yang,
>
> I'm using K8s v1.18.2 installed by Kubeadm over a cluster of 5 nodes (not
> a Minikube).
> Previously, as you pointed out, openjdk version "1.8.0_252" was installed.
> I bump up java version to openjdk 11.0.7 but got something different:
>
> [flink@DAC-E04-W06 bin]$ ./kubernetes-session.sh
> 2020-05-09 11:39:36,737 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, DAC-E04-W06
> 2020-05-09 11:39:36,739 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-05-09 11:39:36,739 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.size, 1024m
> 2020-05-09 11:39:36,739 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.memory.process.size, 24g
> 2020-05-09 11:39:36,739 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numberOfTaskSlots, 24
> 2020-05-09 11:39:36,739 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: parallelism.default, 1
> 2020-05-09 11:39:36,740 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability, zookeeper
> 2020-05-09 11:39:36,740 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.zookeeper.path.root, /flink
> 2020-05-09 11:39:36,740 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.storageDir, hdfs:///user/flink/ha/
> 2020-05-09 11:39:36,740 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.zookeeper.quorum, DAC-E04-W06:2181
> 2020-05-09 11:39:36,741 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-05-09 11:39:36,741 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: rest.port, 8082
> 2020-05-09 11:39:36,817 WARN  io.fabric8.kubernetes.client.Config
>   - Error reading service account token from:
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 2020-05-09 11:39:36,823 WARN  io.fabric8.kubernetes.client.Config
>   - Error reading service account token from:
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 2020-05-09 11:39:37,080 WARN  io.fabric8.kubernetes.client.Config
>   - Error reading service account token from:
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 2020-05-09 11:39:37,082 WARN  io.fabric8.kubernetes.client.Config
>   - Error reading service account token from:
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 2020-05-09 11:39:37,334 ERROR
> org.apache.flink.kubernetes.cli.KubernetesSessionCli  - Error while
> running the Flink session.
> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]
>  for kind: [Service]  with name:
> [flink-cluster-6adb7c62-8940-4828-990c-a87379102d61]  in namespace:
> [default]  failed.
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
> at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
> at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
> at
> org.apa

Re: Cannot start native K8s

2020-05-08 Thread Dongwon Kim
Hello Yang,

I'm using K8s v1.18.2 installed by Kubeadm over a cluster of 5 nodes (not a
Minikube).
Previously, as you pointed out, openjdk version "1.8.0_252" was installed.
I bump up java version to openjdk 11.0.7 but got something different:

[flink@DAC-E04-W06 bin]$ ./kubernetes-session.sh
2020-05-09 11:39:36,737 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, DAC-E04-W06
2020-05-09 11:39:36,739 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2020-05-09 11:39:36,739 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2020-05-09 11:39:36,739 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.process.size, 24g
2020-05-09 11:39:36,739 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 24
2020-05-09 11:39:36,739 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2020-05-09 11:39:36,740 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability, zookeeper
2020-05-09 11:39:36,740 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.path.root, /flink
2020-05-09 11:39:36,740 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.storageDir, hdfs:///user/flink/ha/
2020-05-09 11:39:36,740 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.quorum, DAC-E04-W06:2181
2020-05-09 11:39:36,741 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-05-09 11:39:36,741 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: rest.port, 8082
2020-05-09 11:39:36,817 WARN  io.fabric8.kubernetes.client.Config
- Error reading service account token from:
[/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-05-09 11:39:36,823 WARN  io.fabric8.kubernetes.client.Config
- Error reading service account token from:
[/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-05-09 11:39:37,080 WARN  io.fabric8.kubernetes.client.Config
- Error reading service account token from:
[/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-05-09 11:39:37,082 WARN  io.fabric8.kubernetes.client.Config
- Error reading service account token from:
[/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-05-09 11:39:37,334 ERROR
org.apache.flink.kubernetes.cli.KubernetesSessionCli  - Error while
running the Flink session.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]
 for kind: [Service]  with name:
[flink-cluster-6adb7c62-8940-4828-990c-a87379102d61]  in namespace:
[default]  failed.
at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
at
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
at
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
at
org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
at
org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: Name or
service not known
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at
java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:929)
at
java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1515)
at
java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:848)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1505)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1364)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1298)
at org.apache.flink.kubernetes.shadde

Re: Cannot start native K8s

2020-05-08 Thread Yang Wang
Hi Dongwon Kim,

Are you running Flink on a minikube or a real Kubernetes cluster? I just
could not
reproduce it in a real Kubernetes cluster with java 8u252. For minikube, i
get the
exception with you.


Best,
Yang

Yang Wang  于2020年5月6日周三 上午9:29写道:

> Hi Dongwon Kim,
>
> I think it is a known issue. The native kubernetes integration could not
> work with jdk 8u252
> due to okhttp issue[1]. Currently, you could upgrade your jdk to a new
> version to work around.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17416
>
> Dongwon Kim  于2020年5月6日周三 上午7:15写道:
>
>> Hi,
>>
>> I'm using Flink-1.10 and tested everything [1] successfully.
>> While trying [2], I got the following message.
>> Can anyone help please?
>>
>> [root@DAC-E04-W06 bin]# ./kubernetes-session.sh
>>> 2020-05-06 08:10:49,411 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.rpc.address, DAC-E04-W06
>>> 2020-05-06 08:10:49,412 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2020-05-06 08:10:49,412 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.heap.size, 1024m
>>> 2020-05-06 08:10:49,412 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: taskmanager.memory.process.size, 24g
>>> 2020-05-06 08:10:49,413 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: taskmanager.numberOfTaskSlots, 24
>>> 2020-05-06 08:10:49,413 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: parallelism.default, 1
>>> 2020-05-06 08:10:49,413 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: high-availability, zookeeper
>>> 2020-05-06 08:10:49,413 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: high-availability.zookeeper.path.root, /flink
>>> 2020-05-06 08:10:49,414 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: high-availability.storageDir, hdfs:///user/flink/ha/
>>> 2020-05-06 08:10:49,414 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: high-availability.zookeeper.quorum, DAC-E04-W06:2181
>>> 2020-05-06 08:10:49,414 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.execution.failover-strategy, region
>>> 2020-05-06 08:10:49,415 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: rest.port, 8082
>>> 2020-05-06 08:10:50,386 ERROR
>>> org.apache.flink.kubernetes.cli.KubernetesSessionCli  - Error while
>>> running the Flink session.
>>> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]
>>>  for kind: [Service]  with name:
>>> [flink-cluster-5c12bd50-a540-4614-96d0-549785a8bc62]  in namespace:
>>> [default]  failed.
>>> at
>>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
>>> at
>>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
>>> at
>>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
>>> at
>>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
>>> at
>>> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
>>> at
>>> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
>>> at
>>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
>>> at
>>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at
>>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
>>> Caused by: java.net.SocketException: Broken pipe (Write failed)
>>> at java.net.SocketOutputStream.socketWrite0(Native Method)
>>> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
>>> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
>>> at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
>>> at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
>>> at
>>> sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:894)
>>> at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:865)
>>> at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
>>> at org.apache.flink.kubernetes.shadded.okio.Okio$1.w

Re: No Slots available exception in Apache Flink Job Manager while Scheduling

2020-05-08 Thread Xintong Song
Linking to the jira ticket, for the record.
https://issues.apache.org/jira/browse/FLINK-17560

Thank you~

Xintong Song



On Sat, May 9, 2020 at 2:14 AM Josson Paul  wrote:

> Set up
> --
> Flink verson 1.8.3
>
> Zookeeper HA cluster
>
> 1 ResourceManager/Dispatcher (Same Node)
> 1 TaskManager
> 4 pipelines running with various parallelism's
>
> Issue
> --
>
> Occationally when the Job Manager gets restarted we noticed that all the
> pipelines are not getting scheduled. The error that is reporeted by the Job
> Manger is 'not enough slots are available'. This should not be the case
> because task manager was deployed with sufficient slots for the number of
> pipelines/parallelism we have.
>
> We further noticed that the slot report sent by the taskmanger contains
> slots filled with old CANCELLED job Ids. I am not sure why the task manager
> still holds the details of the old jobs. Thread dump on the task manager
> confirms that old pipelines are not running.
>
> It is not one or two slot report which wrong. If the issue occurs, all the
> slot reports that are sent by TM is wrong and contains old job ids report.
> This continues until I restart the TM.
>
> Also I noticed that when we cancel a job the leader/leaderlatch entires in
> the zookeeper doesn't get cleared for that job. Is that expected?.
>
> /leader/d8beed9c9261dcf191cc7fde46869b64/job_manager_lock
>
> I am aware of https://issues.apache.org/jira/browse/FLINK-12865. But this
> is not the issue happening in this case.
>
> --
> Thanks
> Josson
>


Re: Flink on Kubernetes unable to Recover from failure

2020-05-08 Thread Yun Tang
Hi Morgan

If "because the number of task slots has been reduced to zero", do you mean the 
total task slots reduced to 0? And how many registered task managers could you 
see when this happened (you could click to the "Task Managers" tab to view 
related information).

All containers running do not mean they're all registered to the job manager, I 
think you could refer to the JM and TM log to see whether the register 
connection is lost.

Best
Yun Tang

From: Robert Metzger 
Sent: Friday, May 8, 2020 22:33
To: Morgan Geldenhuys 
Cc: user 
Subject: Re: Flink on Kubernetes unable to Recover from failure

Hey Morgan,

Is it possible for you to provide us with the full logs of the JobManager and 
the affected TaskManager?
This might give us a hint why the number of task slots is zero.

Best,
Robert


On Tue, May 5, 2020 at 11:41 AM Morgan Geldenhuys 
mailto:morgan.geldenh...@tu-berlin.de>> wrote:

Community,

I am currently doing some fault tolerance testing for Flink (1.10) running on 
Kubernetes (1.18) and am encountering an error where after a running job 
experiences a failure, the job fails completely.

A Flink session cluster has been created according to the documentation 
contained here: 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html.
 The job is then uploaded and deployed via the web interface and everything 
runs smoothly. The job has a parallelism of 24 with 3 worker nodes as fail 
overs in reserve. Each worker is assigned 1 task slot each (total of 27).

The next step would be inject an error for which I use the Pumba Chaos Testing 
tool (https://github.com/alexei-led/pumba) to pause a random worker process. 
This selection and pausing is done manually for the moment.

Looking at the error logs, Flink does detect the error after the timeout (The 
heartbeat timeout has been set to 20 seconds):

java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
768848f91ebdbccc8d518e910160414d  timed out.

After the failure has been detected, the system resets to the latest saved 
checkpoint and restarts. The system catches up nicely and resumes normal 
processing... however, after about 3 minutes, the following error occurs:

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'/10.45.128.1:6121'. This might indicate that the 
remote task manager was lost.

The job fails, and is unable to restart because the number of task slots has 
been reduced to zero. Looking at the kubernetes cluster, all containers are 
running...

Has anyone else run into this error? What am I missing? The same thing happens 
when the containers are deleted.

Regards,
M.









Re: Correctly implementing of SourceFunction.run()

2020-05-08 Thread Senthil Kumar
OK, thank you. Much appreciated.

Yes, I don’t want the job to fail. The source has very little data that is 
being pumped into a Broadcast stream.

From: Robert Metzger 
Date: Friday, May 8, 2020 at 9:51 AM
To: Jingsong Li 
Cc: Senthil Kumar , "user@flink.apache.org" 

Subject: Re: Correctly implementing of SourceFunction.run()

Hey Kumar,

if you are swallowing any and all exceptions, your Flink job will not fail 
because of issues arising from your custom source. It might make sense to stop 
the source if you are catching an InterruptedException.

Throwing exceptions out of the run method basically signals the Flink framework 
that the source has failed, and thus the job will fail / go into recovery.
The way you are using the cancel() method + isRunning variable is correct for 
having a proper cancellation behavior of the source.



On Fri, May 8, 2020 at 3:31 AM Jingsong Li 
mailto:jingsongl...@gmail.com>> wrote:
Hi,

Some suggestions from my side:
- synchronized (checkpointLock) to some work and ctx.collect?
- Put Thread.sleep(interval) out of try catch? Maybe should not swallow 
interrupt exception (Like cancel the job).

Best,
Jingsong Lee

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
I am implementing a source function which periodically wakes up and consumes 
data from S3.


My currently implementation is like so.

Following: 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

Is it safe to simply swallow any and all exceptions in the run method and just 
rely on this.isRunning variable to quit the run() method?

Cheers
Kumar

---


@Override
public void cancel() {
this.isRunning = false;   // Set volatile state variable, initially set to 
true on Class
}

@Override
public void run(SourceFunction.SourceContext ctx) {
while (this.isRunning) {
try {
OUT out = /* Do some work */
ctx.collect(out);
Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to 
milli seconds
} catch(Throwable t) {
// Simply swallow
}
}
}



--
Best, Jingsong Lee


No Slots available exception in Apache Flink Job Manager while Scheduling

2020-05-08 Thread Josson Paul
Set up
--
Flink verson 1.8.3

Zookeeper HA cluster

1 ResourceManager/Dispatcher (Same Node)
1 TaskManager
4 pipelines running with various parallelism's

Issue
--

Occationally when the Job Manager gets restarted we noticed that all the
pipelines are not getting scheduled. The error that is reporeted by the Job
Manger is 'not enough slots are available'. This should not be the case
because task manager was deployed with sufficient slots for the number of
pipelines/parallelism we have.

We further noticed that the slot report sent by the taskmanger contains
slots filled with old CANCELLED job Ids. I am not sure why the task manager
still holds the details of the old jobs. Thread dump on the task manager
confirms that old pipelines are not running.

It is not one or two slot report which wrong. If the issue occurs, all the
slot reports that are sent by TM is wrong and contains old job ids report.
This continues until I restart the TM.

Also I noticed that when we cancel a job the leader/leaderlatch entires in
the zookeeper doesn't get cleared for that job. Is that expected?.

/leader/d8beed9c9261dcf191cc7fde46869b64/job_manager_lock

I am aware of https://issues.apache.org/jira/browse/FLINK-12865. But this
is not the issue happening in this case.

-- 
Thanks
Josson


Re: Correctly implementing of SourceFunction.run()

2020-05-08 Thread Robert Metzger
Hey Kumar,

if you are swallowing any and all exceptions, your Flink job will not fail
because of issues arising from your custom source. It might make sense to
stop the source if you are catching an InterruptedException.

Throwing exceptions out of the run method basically signals the Flink
framework that the source has failed, and thus the job will fail / go into
recovery.
The way you are using the cancel() method + isRunning variable is correct
for having a proper cancellation behavior of the source.



On Fri, May 8, 2020 at 3:31 AM Jingsong Li  wrote:

> Hi,
>
> Some suggestions from my side:
> - synchronized (checkpointLock) to some work and ctx.collect?
> - Put Thread.sleep(interval) out of try catch? Maybe should not
> swallow interrupt exception (Like cancel the job).
>
> Best,
> Jingsong Lee
>
> On Fri, May 8, 2020 at 2:52 AM Senthil Kumar  wrote:
>
>> I am implementing a source function which periodically wakes up and
>> consumes data from S3.
>>
>>
>>
>> My currently implementation is like so.
>>
>> Following: 
>> *org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction*
>>
>>
>>
>> Is it safe to simply swallow any and all exceptions in the run method and
>> just rely on this.isRunning variable to quit the run() method?
>>
>>
>>
>> Cheers
>>
>> Kumar
>>
>>
>>
>> ---
>>
>>
>>
>> @Override
>> *public void *cancel() {
>> *this*.*isRunning *= *false*;   // Set volatile state variable, 
>> initially set to true on Class
>> }
>>
>>
>>
>> @Override
>> *public void *run(SourceFunction.SourceContext ctx) {
>> *while *(*this*.*isRunning*) {
>> *try *{
>> OUT out = /* Do some work */
>> ctx.collect(out);
>>
>> Thread.*sleep*(*this*.*sleepIntervalHours ** 60 * 60 * 1000);
>>
>> *// Hours to milli seconds *} *catch*(Throwable t) {
>>
>> // Simply swallow
>> }
>> }
>> }
>>
>>
>>
>
>
> --
> Best, Jingsong Lee
>


Re: MongoDB sink;

2020-05-08 Thread Robert Metzger
I'm also not aware of a MongoDB sink in Flink. The code provided by
Jingsong applies to the "SinkFunction" interface of Flink. That's a good
starting point to implement a custom Sink.

On Wed, May 6, 2020 at 9:46 AM Jingsong Li  wrote:

> Hi,
>
> My impression is that MongoDB's API is not complicated. So you can
> implement a MongoDB sink. Something like:
>
> @Override
> public void invoke(Row value, Context context) throws Exception {
> Map map = new HashMap<>();
> for (int i = 0; i < fieldNames.length; i++) {
> map.put(fieldNames[i], row.getField(i));
> }
> batch.add(new Document(map));
> if (batch.size() >= conf.getBatchSize()) {
> flush();
> }
> }
>
> private void flush() {
> if (batch.isEmpty()) {
> return;
> }
> MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase());
> MongoCollection mongoCollection = 
> mongoDatabase.getCollection(conf.getCollection());
> mongoCollection.insertMany(batch);
> batch.clear();
> }
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 2:42 PM myflink <2644631...@qq.com> wrote:
>
>> my solution:
>> First, Flink sinks data to Kafka;
>> Second, MongoDB reads data from Kafka. Over.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Aissa Elaffani";
>> *发送时间:* 2020年5月6日(星期三) 下午3:17
>> *收件人:* "user";
>> *主题:* MongoDB sink;
>>
>> Hello ,
>>  I want to sink my data to MongoDB but as far as I know there is no sink
>> connector to MongoDB.  How can I implement a MongoDB sink ? If there is any
>> other solutions, I hope you can share with me.
>>
>
>
> --
> Best, Jingsong Lee
>


Re: Flink on Kubernetes unable to Recover from failure

2020-05-08 Thread Robert Metzger
Hey Morgan,

Is it possible for you to provide us with the full logs of the JobManager
and the affected TaskManager?
This might give us a hint why the number of task slots is zero.

Best,
Robert


On Tue, May 5, 2020 at 11:41 AM Morgan Geldenhuys <
morgan.geldenh...@tu-berlin.de> wrote:

>
> Community,
>
> I am currently doing some fault tolerance testing for Flink (1.10) running
> on Kubernetes (1.18) and am encountering an error where after a running job
> experiences a failure, the job fails completely.
>
> A Flink session cluster has been created according to the documentation
> contained here:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html.
> The job is then uploaded and deployed via the web interface and everything
> runs smoothly. The job has a parallelism of 24 with 3 worker nodes as fail
> overs in reserve. Each worker is assigned 1 task slot each (total of 27).
>
> The next step would be inject an error for which I use the Pumba Chaos
> Testing tool (https://github.com/alexei-led/pumba) to pause a random
> worker process. This selection and pausing is done manually for the moment.
>
> Looking at the error logs, Flink does detect the error after the timeout
> (The heartbeat timeout has been set to 20 seconds):
>
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with
> id 768848f91ebdbccc8d518e910160414d  timed out.
>
> After the failure has been detected, the system resets to the latest saved
> checkpoint and restarts. The system catches up nicely and resumes normal
> processing... however, after about 3 minutes, the following error occurs:
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager '/10.45.128.1:6121'.
> This might indicate that the remote task manager was lost.
>
> The job fails, and is unable to restart because the number of task slots
> has been reduced to zero. Looking at the kubernetes cluster, all containers
> are running...
>
> Has anyone else run into this error? What am I missing? The same thing
> happens when the containers are deleted.
>
> Regards,
> M.
>
>
>
>
>
>
>
>


Re: flink-s3-fs-hadoop retry configuration

2020-05-08 Thread Robert Metzger
I validated my assumption. Putting

s3.connection.maximum: 123456

into the flink-conf.yaml file results in the following DEBUG log output:

2020-05-08 16:20:47,461 DEBUG
org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader   [] - Adding
Flink config entry for s3.connection.maximum as fs.s3a.connection.maximum
to Hadoop config

I guess that is the recommended way of passing configuration into the S3
connectors of Flink.

You also asked how to detect retries: DEBUG-log level is helpful again. I
just tried connecting against an invalid port, and got these messages:

2020-05-08 16:26:37,671 DEBUG
org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] -
http-outgoing-7: Shutdown connection
2020-05-08 16:26:37,671 DEBUG org.apache.http.impl.execchain.MainClientExec
   [] - Connection discarded
2020-05-08 16:26:37,671 DEBUG
org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] -
Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total kept
alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
2020-05-08 16:26:37,671 DEBUG com.amazonaws.request
   [] - Retrying Request: HEAD http://127.0.0.1:9000 /test/
Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 Mac_OS_X/10.15.3
OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 scala/2.11.12,
amz-sdk-invocation-id: 051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type:
application/octet-stream, )
2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient
   [] - Retriable error detected, will retry in 4226ms, attempt
number: 7


maybe it makes sense to set the log level only for
"com.amazonaws.http.AmazonHttpClient" to DEBUG.

How to configure the log level depends on the deployment method. Usually,
its done by replacing the first INFO with DEBUG in conf/log4j.properties.
("rootLogger.level = DEBUG")


Best,
Robert

On Fri, May 8, 2020 at 3:51 PM Robert Metzger  wrote:

> Hey Jeff,
>
> Which Flink version are you using?
> Have you tried configuring the S3 filesystem via Flink's  config yaml?
> Afaik all config parameters prefixed with "s3." are mirrored into the
> Hadoop file system connector.
>
>
> On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson 
> wrote:
>
>>  > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
>>  > the hadoop configuration I have provided, as opposed to some separate
>>  > default configuration?
>>
>> I'm reading the docs and source of flink-fs-hadoop-shaded.  I see that
>> core-default-shaded.xml has fs.s3a.connection.maximum set to 15.  I have
>> around 20 different DataStreams being instantiated from S3, so if they
>> each require one connection to be healthy, then 15 is definitely not a
>> good value.
>>
>> However, I seem to be unable to override fs.s3a.connection.maximum using
>> my core-site.xml.  I am also unable to see the DEBUG level messages for
>> the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.
>>
>> So now I'm wondering:
>>
>>  1) Anybody know how to see DEBUG output for flink-fs-hadoop-shaded?
>>
>>  2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
>>  override the config?
>>
>>
>> Thanks in advance,
>>
>>
>> Jeff Henrikson
>>
>>
>>
>>
>> https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded
>>
>>
>> https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml
>>
>>
>>  fs.s3a.connection.maximum
>>  15
>>  Controls the maximum number of simultaneous
>> connections to S3.
>>
>>
>>
>>
>>
>> On 5/1/20 7:30 PM, Jeff Henrikson wrote:
>> > Hello Flink users,
>> >
>> > I could use help with three related questions:
>> >
>> > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
>> >
>> > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
>> > the hadoop configuration I have provided, as opposed to some separate
>> > default configuration?  My job fails quickly when I read larger or more
>> > numerous objects from S3.  I conjecture the failure may be related to
>> > insufficient retries when S3 throttles.
>> >
>> > 3) What s3 fault recovery approach would you recommend?
>> >
>> > Background:
>> >
>> > I am having trouble with reliable operation of the flink-s3-fs-hadoop
>> > connector.   My application sources all its DataStream data from S3,
>> and
>> > appears to get frequently throttled by s3:
>> >
>> >  Caused by:
>> >  org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>> >  Caught exception when processing split: [0]
>> >  s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>> >  1586911084000 : 0 + 33554432
>> >  . . .
>> >  Caused by: java.io.InterruptedIOException: Failed to open
>> >  s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
>> >  s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>> >  com.amazonaws.SdkClientException: Unable to execute HTTP r

Re: flink-s3-fs-hadoop retry configuration

2020-05-08 Thread Robert Metzger
Hey Jeff,

Which Flink version are you using?
Have you tried configuring the S3 filesystem via Flink's  config yaml?
Afaik all config parameters prefixed with "s3." are mirrored into the
Hadoop file system connector.


On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson  wrote:

>  > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
>  > the hadoop configuration I have provided, as opposed to some separate
>  > default configuration?
>
> I'm reading the docs and source of flink-fs-hadoop-shaded.  I see that
> core-default-shaded.xml has fs.s3a.connection.maximum set to 15.  I have
> around 20 different DataStreams being instantiated from S3, so if they
> each require one connection to be healthy, then 15 is definitely not a
> good value.
>
> However, I seem to be unable to override fs.s3a.connection.maximum using
> my core-site.xml.  I am also unable to see the DEBUG level messages for
> the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.
>
> So now I'm wondering:
>
>  1) Anybody know how to see DEBUG output for flink-fs-hadoop-shaded?
>
>  2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
>  override the config?
>
>
> Thanks in advance,
>
>
> Jeff Henrikson
>
>
>
>
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded
>
>
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml
>
>
>  fs.s3a.connection.maximum
>  15
>  Controls the maximum number of simultaneous
> connections to S3.
>
>
>
>
>
> On 5/1/20 7:30 PM, Jeff Henrikson wrote:
> > Hello Flink users,
> >
> > I could use help with three related questions:
> >
> > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
> >
> > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
> > the hadoop configuration I have provided, as opposed to some separate
> > default configuration?  My job fails quickly when I read larger or more
> > numerous objects from S3.  I conjecture the failure may be related to
> > insufficient retries when S3 throttles.
> >
> > 3) What s3 fault recovery approach would you recommend?
> >
> > Background:
> >
> > I am having trouble with reliable operation of the flink-s3-fs-hadoop
> > connector.   My application sources all its DataStream data from S3, and
> > appears to get frequently throttled by s3:
> >
> >  Caused by:
> >  org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> >  Caught exception when processing split: [0]
> >  s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
> >  1586911084000 : 0 + 33554432
> >  . . .
> >  Caused by: java.io.InterruptedIOException: Failed to open
> >  s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
> >  s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
> >  com.amazonaws.SdkClientException: Unable to execute HTTP request:
> >  Timeout waiting for connection from pool
> >
> > The s3 throttling does not seem to trigger retries and so
> > causes the job to fail.  For troubleshooting purposes, the job stays up
> > for much longer if I reduce s3 inputs to my job by disabling
> functionality.
> >
> > I see in the documentation for hadoop-aws that there are properties
> > such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
> > within hadoop.
> >
> > After wrangling with some classpath troubles, I managed to get
> > flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
> > configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
> > that the cluster parses the configuration by passing invalid xml and
> > seeing the cluster crash.
> >
> > The puzzle with which I am now faced is that the configuration for
> > retries and timeouts in core-site.xml seems to have no effect on the
> > application.
> >
> > I deploy in kubernetes with a custom docker image.  For now, I have
> > not enabled the zookeeper-based HA.
> >
> > See below for a frequent stacktrace that I interpret as likely to be
> > caused by s3 throttling.
> >
> > Thanks in advance for any help.
> >
> > Regards,
> >
> >
> > Jeff Henrikson
> >
> >
> >
> >  2020-04-30 19:35:24
> >  org.apache.flink.runtime.JobException: Recovery is suppressed by
> > NoRestartBackoffTimeStrategy
> >  at
> >
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
> >
> >  at
> >
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
> >
> >  at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>
> >
> >  at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>
> >
> >  at
> >
> org.apache.flink.runtime.scheduler.DefaultSchedule

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Kurt Young
+dev 

Best,
Kurt


On Fri, May 8, 2020 at 3:35 PM Caizhi Weng  wrote:

> Hi Jeff,
>
> Thanks for the response. However I'm using executeAsync so that I can run
> the job asynchronously and get a JobClient to monitor the job. JobListener
> only works for synchronous execute method. Is there other way to achieve
> this?
>
> Jeff Zhang  于2020年5月8日周五 下午3:29写道:
>
>> I use JobListener#onJobExecuted to be notified that the flink job is
>> done.
>> It is pretty reliable for me, the only exception is the client process is
>> down.
>>
>> BTW, the reason you see ApplicationNotFound exception is that yarn app
>> is terminated which means the flink cluster is shutdown. While for
>> standalone mode, the flink cluster is always up.
>>
>>
>> Caizhi Weng  于2020年5月8日周五 下午2:47写道:
>>
>>> Hi dear Flink community,
>>>
>>> I would like to determine whether a job has finished (no matter
>>> successfully or exceptionally) in my code.
>>>
>>> I used to think that JobClient#getJobStatus is a good idea, but I found
>>> that it behaves quite differently under different executing environments.
>>> For example, under a standalone session cluster it will return the FINISHED
>>> status for a finished job, while under a yarn per job cluster it will throw
>>> a ApplicationNotFound exception. I'm afraid that there might be other
>>> behaviors for other environments.
>>>
>>> So what's the best practice to determine whether a job has finished or
>>> not? Note that I'm not waiting for the job to finish. If the job hasn't
>>> finished I would like to know it and do something else.
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: java.lang.IllegalStateException: The RPC connection is already closed

2020-05-08 Thread Robert Metzger
Are you able to reproduce the issue reliably?
If so, it would be nice if you could test if the issue still happens with
the 1.10.1 release candidate:
https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc3/

On Mon, May 4, 2020 at 6:08 PM Manish G 
wrote:

> https://issues.apache.org/jira/browse/FLINK-16373
>
> On Mon, May 4, 2020 at 9:37 PM Manish G 
> wrote:
>
>> I found another similar issue:
>>
>>
>> On Mon, May 4, 2020 at 9:28 PM Steven Wu  wrote:
>>
>>> Manish, might be related to this bug, which is fixed in 1.10.1.
>>>
>>>
>>> https://issues.apache.org/jira/browse/FLINK-14316?focusedCommentId=16946580&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16946580
>>>
>>> On Mon, May 4, 2020 at 5:52 AM Manish G 
>>> wrote:
>>>
 Hi,

 I have set up flink and kafka locally. When I start my flink
 program(configured ot read messages from kafka topic), I get error as:

 2020-05-04 18:17:58.035  INFO 23516 --- [lt-dispatcher-2]
 o.a.f.r.taskexecutor.JobLeaderService: Successful registration at job
 manager akka://flink/user/jobmanager_1 for job
 4f1932f75aafb97028fdbf8cd165ee9d.
 2020-05-04 18:17:58.035  INFO 23516 --- [lt-dispatcher-4]
 o.a.f.r.taskexecutor.JobLeaderService: Successful registration at job
 manager akka://flink/user/jobmanager_1 for job
 4f1932f75aafb97028fdbf8cd165ee9d.
 2020-05-04 18:17:58.035  INFO 23516 --- [lt-dispatcher-4]
 o.a.f.runtime.taskexecutor.TaskExecutor  : Establish JobManager connection
 for job 4f1932f75aafb97028fdbf8cd165ee9d.
 2020-05-04 18:17:58.035  WARN 23516 --- [lt-dispatcher-5]
 o.a.f.r.h.n.e.EmbeddedLeaderService  : Error notifying leader listener
 about new leader

 java.lang.IllegalStateException: The RPC connection is already closed
 at
 org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
 ~[flink-core-1.7.1.jar:1.7.1]
 at
 org.apache.flink.runtime.registration.RegisteredRpcConnection.start(RegisteredRpcConnection.java:91)
 ~[flink-runtime_2.11-1.7.1.jar:1.7.1]

 What can be the root cause for this?

>>>


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread Seth Wiesman
Gordon is correct. Additionally, if you are using flink 1.10 you may be
running into a known bug that has been resolved in 1.10.1 which will be
released soon.

Seth

https://issues.apache.org/jira/browse/FLINK-16313


On Fri, May 8, 2020 at 5:19 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> The last time I saw this error, was that there was a mismatch in the used
> flink-state-processor-api version and other core Flink dependencies.
> Could you confirm that?
>
> Also, are you seeing this assertion error consistently, or only
> occasionally?
> cc'ing Seth, maybe he has other clues on the cause.
>
> Cheers,
> Gordon
>
> On Fri, May 8, 2020 at 3:06 PM luisfaamaral 
> wrote:
>
>> No one? :)
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: What is the RocksDB local directory in flink checkpointing?

2020-05-08 Thread Yun Tang
Hi LakeShen

You could refer to [1] and [2] to know the temporary directory in YARN, the 
related log could be
"Setting directories for temporary files to: " or "Overriding Fink's temporary 
file directories with those specified in the Flink config: "


[1] 
https://github.com/apache/flink/blob/0dda6fe9dff4f667b110cda39bfe9738ba615b24/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java#L103
[2] 
https://github.com/apache/flink/blob/0dda6fe9dff4f667b110cda39bfe9738ba615b24/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java#L478-L489

Best
Yun Tang

From: Till Rohrmann 
Sent: Wednesday, May 6, 2020 17:35
To: LakeShen 
Cc: dev ; user ; user-zh 

Subject: Re: What is the RocksDB local directory in flink checkpointing?

Hi LakeShen,

`state.backend.rocksdb.localdir` defines the directory in which RocksDB
will store its local files. Local files are RocksDB's SST and metadata
files for example. This directory does not need to be persisted. If the
config option is not configured, then it will use the nodes temporary file
directory.

Cheers,
Till

On Wed, May 6, 2020 at 6:07 AM LakeShen  wrote:

> Hi community,
>
> Now I have a question about flink checkpoint local directory , our flink
> version is 1.6, job mode is
>
> flink on yarn per job . I saw the flink source code , and I find the flink
> checkpoint local directory is
>
> /tmp when you didn't config the "state.backend.rocksdb.localdir". But I go
> into the /tmp dir ,I
>
> couldn't find the flink checkpoint state local directory.
>
> What is the RocksDB local directory in flink checkpointing?  I am looking
> forward to your reply.
>
> Best,
> LakeShen
>


Re: Window processing in Stateful Functions

2020-05-08 Thread m@xi
Dear Igal,Very insightful answer. Thanks.
Igal Shilman wrote
> An alternative approach would be to implement a 
*
> thumbling window
*
>  per vertex(a stateful function instance)by sending to itself a delayed
> message [2]. When that specific delayedmessage arrives you wouldhave to
> purge the oldest edges by examining the edges in state.

Indeed, the delayed asynchronous messages are a workaround for *tumbling
window* simulation in SF. I believe you assume a message received by a
stateful function contains multiple edges, i.e. which can all be delayed by
a certain amount of time. Therefore, when a function receives a message, it
purges all of its existing edges and incorporates the new (delayed)
ones.Correct?Nevertheless, if you think of it, the delay is essentially the
*window slide*. Now, what about the *window range*? 
Igal Shilman wrote
> Data stream windows are not yet supported in statefun, but it seems
> likethe main motivation hereis to purge old edges?If this is the case
> perhaps we need to integrate state TTL [1] intopersisted
> values/persistedtables.

I was not aware about the TTL, very interesting and handful. Essentially,
the TTL can enforce the *window range* i.e., attach to each tuple received
by a stateful function its lifespan/duration. So, the first TTL attribute
sets the range /StateTtlConfig.newBuilder(Time.seconds(*window
range*))/.Therefore, by *combining TTL and SF Delayed Messaging* we can
*simulate sliding window* processing on a stateful function basis.However,
TTL is a Flink constuct and I am not sure if I got it correctly. You said
Igal Shilman wrote
> If this is the case perhaps 
*
> we need to integrate
*
>  state TTL [1] intopersisted values/persistedtables.

If this is the case, then I believe it would be great to integrate TLL into 
Persisted Values/Tables

 
.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread Tzu-Li (Gordon) Tai
Hi,

The last time I saw this error, was that there was a mismatch in the used
flink-state-processor-api version and other core Flink dependencies.
Could you confirm that?

Also, are you seeing this assertion error consistently, or only
occasionally?
cc'ing Seth, maybe he has other clues on the cause.

Cheers,
Gordon

On Fri, May 8, 2020 at 3:06 PM luisfaamaral 
wrote:

> No one? :)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-08 Thread Congxian Qiu
Hi

Currently, it is hard to determine which files can be deleted safely in the
shared folder, the ground truth is in the checkpoint metafile. I've created
an issue[1] for such a feature

[1] https://issues.apache.org/jira/browse/FLINK-17571
Best,
Congxian


Trystan  于2020年5月8日周五 下午1:05写道:

> Aha, so incremental checkpointing *does* rely on infinitely-previous
> checkpoint state, regardless of the incremental retention number. The
> documentation wasn't entirely clear about this. One would assume that if
> you retain 3 checkpoints, anything older than the 3rd is irrelevant, but
> that's evidently not true. So it is never safe to delete any files in
> /shared, because we can't know which files belong to the current job (and
> may have lived on from checkpoint 1 even though we're on checkpoint 10 and
> only "retain" 3) and which ones have been abandoned altogether (due to a
> previous run of the job where we didn't restore state).
>
> This is really unfortunate - it can lead to a case where you accumulate a
> huge number of files in S3 and you can't know when ones to delete,
> especially if the job id remains the same (for job mode, they're all
> zeros). So this shared state lives on forever and there is no way to ever
> clean it up, at all. I am surprised that this hasn't been a problem for
> anyone else. Maybe I should just file a feature request for this, at least
> to find some solution for ways to clean up these directories.
>
> I appreciate your patience and help, thank you so much!
>
> Trystan
>
> On Thu, May 7, 2020 at 7:15 PM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> Yes, there should only files used in checkpoint 8 and 9 and 10 in the
>> checkpoint file, but you can not delete the file which created older than 3
>> minutes(because checkpoint 8,9, 10 may reuse the file created in the
>> previous checkpoint, this is the how incremental checkpoint works[1])
>>
>> you can also check the directory of checkpoint files[2] for more
>> information, copied from the website here:
>> > The SHARED directory is for state that is possibly part of multiple
>> checkpoints, TASKOWNED is for state that must never be dropped by the
>> JobManager, and EXCLUSIVE is for state that belongs to one checkpoint
>> only.
>>
>> For the entropy injection, you can enable it as the documentation said,
>> it will replace the entropy_key with some random strings with the
>> specified length so that the files are not all in the same directory.
>>
>> [1]
>> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>> Best,
>> Congxian
>>
>>
>> Trystan  于2020年5月7日周四 下午12:54写道:
>>
>>> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
>>> incremental checkpoints (say every minute), and I've just completed
>>> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
>>> anything older than ~3 minutes can safely be deleted? The state from
>>> checkpoint 5 doesn't live on in the shared directory - at all?
>>>
>>> I ask because we have run into cases where we end up abandoning the
>>> state, and Flink does not clean up state from, say, a previous iteration of
>>> the job if you don't restore state. We need to remove these files
>>> automatically, but I want to be sure that I don't blow away older files in
>>> the shared dir from earlier, subsumed checkpoints - but you are saying that
>>> isn't possible, and that all subsumed checkpoints will have their /shared
>>> state rewritten or cleaned up as needed, correct?
>>>
>>> As for entropy, where would you suggest to use it? My understanding is
>>> that I don't control anything beyond the checkpoint directory, and since
>>> shared is in that directory I can't put entropy inside the shared directory
>>> itself (which is what I would need).
>>>
>>> Thanks,
>>> Trystan
>>>
>>> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu 
>>> wrote:
>>>
 Hi
 For the rate limit, could you please try entropy injection[1].
 For checkpoint, Flink will handle the file lifecycle(it will delete the
 file if it will never be used in the future). The file in the checkpoint
 will be there if the corresponding checkpoint is still valid.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
 Best,
 Congxian


 Trystan  于2020年5月7日周四 上午2:46写道:

> Hello!
>
> Recently we ran into an issue when checkpointing to S3. Because S3
> ratelimits based on prefix, the /shared directory would get slammed and
> cause S3 throttling. There is no solution for this, because
> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 
> 3,500
> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>
> (source:
> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-perfor

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Caizhi Weng
Hi Jeff,

Thanks for the response. However I'm using executeAsync so that I can run
the job asynchronously and get a JobClient to monitor the job. JobListener
only works for synchronous execute method. Is there other way to achieve
this?

Jeff Zhang  于2020年5月8日周五 下午3:29写道:

> I use JobListener#onJobExecuted to be notified that the flink job is done.
> It is pretty reliable for me, the only exception is the client process is
> down.
>
> BTW, the reason you see ApplicationNotFound exception is that yarn app is
> terminated which means the flink cluster is shutdown. While for standalone
> mode, the flink cluster is always up.
>
>
> Caizhi Weng  于2020年5月8日周五 下午2:47写道:
>
>> Hi dear Flink community,
>>
>> I would like to determine whether a job has finished (no matter
>> successfully or exceptionally) in my code.
>>
>> I used to think that JobClient#getJobStatus is a good idea, but I found
>> that it behaves quite differently under different executing environments.
>> For example, under a standalone session cluster it will return the FINISHED
>> status for a finished job, while under a yarn per job cluster it will throw
>> a ApplicationNotFound exception. I'm afraid that there might be other
>> behaviors for other environments.
>>
>> So what's the best practice to determine whether a job has finished or
>> not? Note that I'm not waiting for the job to finish. If the job hasn't
>> finished I would like to know it and do something else.
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Jeff Zhang
I use JobListener#onJobExecuted to be notified that the flink job is done.
It is pretty reliable for me, the only exception is the client process is
down.

BTW, the reason you see ApplicationNotFound exception is that yarn app is
terminated which means the flink cluster is shutdown. While for standalone
mode, the flink cluster is always up.


Caizhi Weng  于2020年5月8日周五 下午2:47写道:

> Hi dear Flink community,
>
> I would like to determine whether a job has finished (no matter
> successfully or exceptionally) in my code.
>
> I used to think that JobClient#getJobStatus is a good idea, but I found
> that it behaves quite differently under different executing environments.
> For example, under a standalone session cluster it will return the FINISHED
> status for a finished job, while under a yarn per job cluster it will throw
> a ApplicationNotFound exception. I'm afraid that there might be other
> behaviors for other environments.
>
> So what's the best practice to determine whether a job has finished or
> not? Note that I'm not waiting for the job to finish. If the job hasn't
> finished I would like to know it and do something else.
>


-- 
Best Regards

Jeff Zhang


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread luisfaamaral
No one? :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/