Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Xingbo Huang
Hi,

I found that the spark community is also working on redesigning pyspark
documentation[1] recently. Maybe we can compare the difference between our
document structure and its document structure.

[1] https://issues.apache.org/jira/browse/SPARK-31851
http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html

Best,
Xingbo

David Anderson  于2020年8月5日周三 上午3:17写道:

> I'm delighted to see energy going into improving the documentation.
>
> With the current documentation, I get a lot of questions that I believe
> reflect two fundamental problems with what we currently provide:
>
> (1) We have a lot of contextual information in our heads about how Flink
> works, and we are able to use that knowledge to make reasonable inferences
> about how things (probably) work in cases we aren't so familiar with. For
> example, I get a lot of questions of the form "If I use  will
> I still have exactly once guarantees?" The answer is always yes, but they
> continue to have doubts because we have failed to clearly communicate this
> fundamental, underlying principle.
>
> This specific example about fault tolerance applies across all of the
> Flink docs, but the general idea can also be applied to the Table/SQL and
> PyFlink docs. The guiding principles underlying these APIs should be
> written down in one easy-to-find place.
>
> (2) The other kind of question I get a lot is "Can I do  with ?"
> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
> very difficult to answer because it is frequently the case that one has to
> reason about why a given feature doesn't seem to appear in the
> documentation. It could be that I'm looking in the wrong place, or it could
> be that someone forgot to document something, or it could be that it can in
> fact be done by applying a general mechanism in a specific way that I
> haven't thought of -- as in this case, where one can use a JDBC sink from
> Python if one thinks to use DDL.
>
> So I think it would be helpful to be explicit about both what is, and what
> is not, supported in PyFlink. And to have some very clear organizing
> principles in the documentation so that users can quickly learn where to
> look for specific facts.
>
> Regards,
> David
>
>
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
> wrote:
>
>> Hi Seth and David,
>>
>> I'm very happy to have your reply and suggestions. I would like to share
>> my thoughts here:
>>
>> The main motivation we want to refactor the PyFlink doc is that we want
>> to make sure that the Python users could find all they want starting from
>> the PyFlink documentation mainpage. That’s, the PyFlink documentation
>> should have a catalogue which includes all the functionalities available in
>> PyFlink. However, this doesn’t mean that we will make a copy of the content
>> of the documentation in the other places. It may be just a reference/link
>> to the other documentation if needed. For the documentation added under
>> PyFlink mainpage, the principle is that it should only include Python
>> specific content, instead of making a copy of the Java content.
>>
>> >>  I'm concerned that this proposal duplicates a lot of content that
>> will quickly get out of sync. It feels like it is documenting PyFlink
>> separately from the rest of the project.
>>
>> Regarding the concerns about maintainability, as mentioned above, The
>> goal of this FLIP is to provide an intelligible entrance of Python API, and
>> the content in it should only contain the information which is useful for
>> Python users. There are indeed many agenda items that duplicate the Java
>> documents in this FLIP, but it doesn't mean the content would be copied
>> from Java documentation. i.e, if the content of the document is the same as
>> the corresponding Java document, we will add a link to the Java document.
>> e.g. the "Built-in functions" and "SQL". We only create a page for the
>> Python-only content, and then redirect to the Java document if there is
>> something shared with Java. e.g. "Connectors" and "Catalogs". If the
>> document is Python-only and already exists, we will move it from the old
>> python document to the new python document, e.g. "Configurations". If the
>> document is Python-only and not exists before, we will create a new page
>> for it. e.g. "DataTypes".
>>
>> The main reason we create a new page for Python Data Types is that it is
>> only conceptually one-to-one correspondence with Java Data Types, but the
>> actual document content would be very different from Java DataTypes. Some
>> detailed difference are as following:
>>
>>
>>
>>   - The text in the Java Data Types document is written for JVM-based
>> language users, which is incomprehensible to users who only understand
>> python.
>>
>>   - Currently the Python Data Types does not support the "bridgedTo"
>> method, DataTypes.RAW, DataTypes.NULL and User Defined Types.
>>
>>   - The section "Planner Compatibilit

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Yang Wang
Hi Eleanore,

Yes, I suggest to use Job to replace Deployment. It could be used to run
jobmanager one time and finish after a successful/failed completion.

However, using Job still could not solve your problem completely. Just as
Till said, When a job exhausts the restart strategy, the jobmanager
pod will terminate with non-zero exit code. It will cause the K8s
restarting it again. Even though we could set the resartPolicy and
backoffLimit,
this is not a clean and correct way to go. We should terminate the
jobmanager process with zero exit code in such situation.

@Till Rohrmann  I just have one concern. Is it a
special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
terminating with
non-zero exit code is harmless.


Best,
Yang

Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I put
> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
> deployment for job manager, and 1 k8s deployment for task manager, and 1
> k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will cause
> the job manager pod to be restarted. Which is not the ideal behavior.
>
> Do you suggest that I should change the deployment strategy from using k8s
> deployment to k8s job? In case the flink program exit with non-zero code
> (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink the
>> exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should rethink the
>>> exit codes of Flink. In general you want K8s to restart a failed Flink
>>> process. Hence, an application which terminates in state FAILED should not
>>> return a non-zero exit code because it is a valid termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to deploy the jobmanager.
 Please set .spec.template.spec.restartPolicy = "Never" and
 spec.backoffLimit = 0.
 Refer here[1] for more information.

 Then, when the jobmanager failed because of any reason, the K8s job
 will be marked failed. And K8s will not restart the job again.

 [1].
 https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 上午12:05写道:

> Hi Till,
>
> Thanks for the reply!
>
> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
> Specifically, I build a custom docker image, which I copied the app jar
> (not uber jar) and all its dependencies under /flink/lib.
>
> So my question is more like, in this case, if the job is marked as
> FAILED, which causes k8s to restart the pod, this seems not help at all,
> what are the suggestions for such scenario?
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>
> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
> wrote:
>
>> Hi Eleanore,
>>
>> how are you deploying Flink exactly? Are you using the application
>> mode with native K8s support to deploy a cluster [1] or are you manually
>> deploying a per-job mode [2]?
>>
>> I believe the problem might be that we terminate the Flink process
>> with a non-zero exit code if the job reaches the ApplicationStatus.FAILED
>> [3].
>>
>> cc Yang Wang have you observed a similar behavior when running Flink
>> in per-job mode on K8s?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>> [3]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>
>> On Fri, Jul 31, 2020 at 6:26 PM Ele

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Till Rohrmann
Yes for the other deployments it is not a problem. A reason why people
preferred non-zero exit codes in case of FAILED jobs is that this is easier
to monitor than having to take a look at the actual job result. Moreover,
in the YARN web UI the application shows as failed if I am not mistaken.
However, from a framework's perspective, a FAILED job does not mean that
Flink has failed and, hence, the return code could still be 0 in my opinion.

Cheers,
Till

On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:

> Hi Eleanore,
>
> Yes, I suggest to use Job to replace Deployment. It could be used to run
> jobmanager one time and finish after a successful/failed completion.
>
> However, using Job still could not solve your problem completely. Just as
> Till said, When a job exhausts the restart strategy, the jobmanager
> pod will terminate with non-zero exit code. It will cause the K8s
> restarting it again. Even though we could set the resartPolicy and
> backoffLimit,
> this is not a clean and correct way to go. We should terminate the
> jobmanager process with zero exit code in such situation.
>
> @Till Rohrmann  I just have one concern. Is it a
> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
> terminating with
> non-zero exit code is harmless.
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>
>> Hi Yang & Till,
>>
>> Thanks for your prompt reply!
>>
>> Yang, regarding your question, I am actually not using k8s job, as I put
>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>> k8s service for job manager.
>>
>> As you mentioned above, if flink job is marked as failed, it will cause
>> the job manager pod to be restarted. Which is not the ideal behavior.
>>
>> Do you suggest that I should change the deployment strategy from using
>> k8s deployment to k8s job? In case the flink program exit with non-zero
>> code (e.g. exhausted number of configured restart), pod can be marked as
>> complete hence not restarting the job again?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>
>>> @Till Rohrmann  In native mode, when a Flink
>>> application terminates with FAILED state, all the resources will be cleaned
>>> up.
>>>
>>> However, in standalone mode, I agree with you that we need to rethink
>>> the exit code of Flink. When a job exhausts the restart
>>> strategy, we should terminate the pod and do not restart again. After
>>> googling, it seems that we could not specify the restartPolicy
>>> based on exit code[1]. So maybe we need to return a zero exit code to
>>> avoid restarting by K8s.
>>>
>>> [1].
>>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>>
>>> Best,
>>> Yang
>>>
>>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>>
 @Yang Wang  I believe that we should
 rethink the exit codes of Flink. In general you want K8s to restart a
 failed Flink process. Hence, an application which terminates in state
 FAILED should not return a non-zero exit code because it is a valid
 termination state.

 Cheers,
 Till

 On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:

> Hi Eleanore,
>
> I think you are using K8s resource "Job" to deploy the jobmanager.
> Please set .spec.template.spec.restartPolicy = "Never" and
> spec.backoffLimit = 0.
> Refer here[1] for more information.
>
> Then, when the jobmanager failed because of any reason, the K8s job
> will be marked failed. And K8s will not restart the job again.
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>
>> Hi Till,
>>
>> Thanks for the reply!
>>
>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>> Specifically, I build a custom docker image, which I copied the app jar
>> (not uber jar) and all its dependencies under /flink/lib.
>>
>> So my question is more like, in this case, if the job is marked as
>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>> what are the suggestions for such scenario?
>>
>> Thanks a lot!
>> Eleanore
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>
>> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> how are you deploying Flink exactly? Are you using the application
>>> mode with native K8s support to deploy a cluster [1] or are you manually
>>> deploying a per-job mode [2]?
>>>
>>> I believe the problem might be that we terminate the Flink process
>>> with a non-zero exit code if the job reaches the 

The bytecode of the class does not match the source code

2020-08-05 Thread 魏子涵
Hi, everyone:
  I found  the 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 
class in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
problem we need to fix(if it is, what should we do)? or just let it go?

Re: Kafka transaction error lead to data loss under end to end exact-once

2020-08-05 Thread Khachatryan Roman
Hi Lu,

AFAIK, it's not going to be fixed. As you mentioned in the first email,
Kafka should be configured so that it's transaction timeout is less than
your max checkpoint duration.

However, you should not only change transaction.timeout.ms in producer but
also transaction.max.timeout.ms on your brokers.
Please refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats

Regards,
Roman


On Wed, Aug 5, 2020 at 12:24 AM Lu Niu  wrote:

> Hi, Khachatryan
>
> Thank you for the reply. Is that a problem that can be fixed? If so, is
> the fix on roadmap? Thanks!
>
> Best
> Lu
>
> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Lu,
>>
>> Yes, this error indicates data loss (unless there were no records in the
>> transactions).
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu  wrote:
>>
>>> Hi,
>>>
>>> We are using end to end exact-once flink + kafka and
>>> encountered belowing exception which usually came after checkpoint failures:
>>> ```
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>> Producer attempted an operation with an old epoch. Either there is a newer
>>> producer with the same transactionalId, or the producer's transaction has
>>> been expired by the broker.2020-07-28 16:27:51,633 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job xxx
>>> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
>>> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>> failed, logging first encountered failure at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
>>> more*
>>> ```
>>> We did some end to end tests and noticed whenever such a thing happens,
>>> there will be a data loss.
>>>
>>> Referring to several related questions, I understand I need to increase `
>>> transaction.timeout.ms`  because:
>>> ```
>>> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
>>> that were started before taking a checkpoint, after recovering from the
>>> said checkpoint. If the time between Flink application crash and completed
>>> restart is larger than Kafka’s transaction timeout there will be data loss
>>> (Kafka will automatically abort transactions that exceeded timeout time).*
>>> ```
>>>
>>> But I want to confirm with the community that:
>>> *Does an exception like this will always lead to data loss? *
>>>
>>> I asked because we get this exception sometimes even when the checkpoint
>>> succeeds.
>>>
>>> Setup:
>>> Flink 1.9.1
>>>
>>> Best
>>> Lu
>>>
>>


Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Please make sure you have loaded the correct source jar, and aren't by 
chance still using the 1.11.0 source jar.


On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the 
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class 
in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it 
a problem we need to fix(if it is, what should we do)? or just let it go?







Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread Wei Zhong
Hi Xingbo,

Thanks for your information. 

I think the PySpark's documentation redesigning deserves our attention. It 
seems that the Spark community has also begun to treat the user experience of 
Python documentation more seriously. We can continue to pay attention to the 
discussion and progress of the redesigning in the Spark community. It is so 
similar to our working that there should be some ideas worthy for us.

Best,
Wei


> 在 2020年8月5日,15:02,Xingbo Huang  写道:
> 
> Hi,
> 
> I found that the spark community is also working on redesigning pyspark 
> documentation[1] recently. Maybe we can compare the difference between our 
> document structure and its document structure.
> 
> [1] https://issues.apache.org/jira/browse/SPARK-31851 
> 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>  
> 
> 
> Best,
> Xingbo
> 
> David Anderson mailto:da...@alpinegizmo.com>> 
> 于2020年8月5日周三 上午3:17写道:
> I'm delighted to see energy going into improving the documentation.
> 
> With the current documentation, I get a lot of questions that I believe 
> reflect two fundamental problems with what we currently provide:
> 
> (1) We have a lot of contextual information in our heads about how Flink 
> works, and we are able to use that knowledge to make reasonable inferences 
> about how things (probably) work in cases we aren't so familiar with. For 
> example, I get a lot of questions of the form "If I use  will I 
> still have exactly once guarantees?" The answer is always yes, but they 
> continue to have doubts because we have failed to clearly communicate this 
> fundamental, underlying principle. 
> 
> This specific example about fault tolerance applies across all of the Flink 
> docs, but the general idea can also be applied to the Table/SQL and PyFlink 
> docs. The guiding principles underlying these APIs should be written down in 
> one easy-to-find place. 
> 
> (2) The other kind of question I get a lot is "Can I do  with ?" E.g., 
> "Can I use the JDBC table sink from PyFlink?" These questions can be very 
> difficult to answer because it is frequently the case that one has to reason 
> about why a given feature doesn't seem to appear in the documentation. It 
> could be that I'm looking in the wrong place, or it could be that someone 
> forgot to document something, or it could be that it can in fact be done by 
> applying a general mechanism in a specific way that I haven't thought of -- 
> as in this case, where one can use a JDBC sink from Python if one thinks to 
> use DDL. 
> 
> So I think it would be helpful to be explicit about both what is, and what is 
> not, supported in PyFlink. And to have some very clear organizing principles 
> in the documentation so that users can quickly learn where to look for 
> specific facts.
> 
> Regards,
> David
> 
> 
> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun  > wrote:
> Hi Seth and David,
> 
> I'm very happy to have your reply and suggestions. I would like to share my 
> thoughts here:
> 
> The main motivation we want to refactor the PyFlink doc is that we want to 
> make sure that the Python users could find all they want starting from the 
> PyFlink documentation mainpage. That’s, the PyFlink documentation should have 
> a catalogue which includes all the functionalities available in PyFlink. 
> However, this doesn’t mean that we will make a copy of the content of the 
> documentation in the other places. It may be just a reference/link to the 
> other documentation if needed. For the documentation added under PyFlink 
> mainpage, the principle is that it should only include Python specific 
> content, instead of making a copy of the Java content.
> 
> >>  I'm concerned that this proposal duplicates a lot of content that will 
> >> quickly get out of sync. It feels like it is documenting PyFlink 
> >> separately from the rest of the project.
> 
> Regarding the concerns about maintainability, as mentioned above, The goal of 
> this FLIP is to provide an intelligible entrance of Python API, and the 
> content in it should only contain the information which is useful for Python 
> users. There are indeed many agenda items that duplicate the Java documents 
> in this FLIP, but it doesn't mean the content would be copied from Java 
> documentation. i.e, if the content of the document is the same as the 
> corresponding Java document, we will add a link to the Java document. e.g. 
> the "Built-in functions" and "SQL". We only create a page for the Python-only 
> content, and then redirect to the Java document if there is something shared 
> with Java. e.g. "Connectors" and "Catalogs". If the document is Python-only 
> and already exists, we will move it from the old python do

Re:Re: The bytecode of the class does not match the source code

2020-08-05 Thread 魏子涵
I'm sure the two versions match up. Following is the pic comparing codes in IDEA
https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70
















At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and aren't by chance 
still using the 1.11.0 source jar.



On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
  I found  the 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 
class in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
problem we need to fix(if it is, what should we do)? or just let it go?




 




Re: The bytecode of the class does not match the source code

2020-08-05 Thread Jake

hi 魏子涵

Idea decompiled code is not match java source code, you can download java 
source code in idea.

/Volumes/work/maven_repository/org/apache/flink/flink-runtime_2.11/1.10.1/flink-runtime_2.11-1.10.1-sources.jar!/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java

Jake


> On Aug 5, 2020, at 6:20 PM, 魏子涵  wrote:
> 
> I'm sure the two versions match up. Following is the pic comparing codes in 
> IDEA
> https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70
> 
> 
> 
> 
> 
> At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:
> 
> Please make sure you have loaded the correct source jar, and aren't by chance 
> still using the 1.11.0 source jar.
> 
> On 05/08/2020 09:57, 魏子涵 wrote:
>> Hi, everyone:
>>   I found  the 
>> 【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class 
>> in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it a 
>> problem we need to fix(if it is, what should we do)? or just let it go?
>> 
>> 
>>  
> 
> 
> 
>  



Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Well of course these differ; on the left you have the decompiled 
bytecode, on the right the original source.


If these were the same you wouldn't need source jars.

On 05/08/2020 12:20, 魏子涵 wrote:
I'm sure the two versions match up. Following is the pic comparing 
codes in IDEA

https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70






At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and
aren't by chance still using the 1.11.0 source jar.

On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class
in【flink-runtime_2.11-1.11.1.jar】does not match the source code.
Is it a problem we need to fix(if it is, what should we do)? or
just let it go?










Re: Two Queries and a Kafka Topic

2020-08-05 Thread Theo Diefenthal
Hi Marco, 

In general, I see three solutions here you could approach: 

1. Use the StateProcessorAPI: You can run a program with the stateProcessorAPI 
that loads the data from JDBC and stores it into a Flink SavePoint. Afterwards, 
you start your streaming job from that savepoint which will load its state and 
within find all the data from JDBC stored already. 
2. Load from master, distribute with the job: When you build up your jobgraph, 
you could execute the JDBC queries and put the result into some Serializable 
class which in turn you plug in a an operator in your stream (e.g. a map 
stage). The class along with all the queried data will be serialized and 
deserialized on the taskmanagers (Usually, I use this for configuration 
parameters, but it might be ok in this case as well) 
3. Load from TaskManager: In your map-function, if the very first event is 
received, you can block processing and synchronously load the data from JDBC 
(So each Taskmanager performs the JDBC query itself). You then keep the data to 
be used for all subsequent map steps. 

I think, option 3 is the easiest to be implemented while option 1 might be the 
most elegant way in my opinion. 

Best regards 
Theo 


Von: "Marco Villalobos"  
An: "Leonard Xu"  
CC: "user"  
Gesendet: Mittwoch, 5. August 2020 04:33:23 
Betreff: Re: Two Queries and a Kafka Topic 

Hi Leonard, 

First, Thank you. 

I am currently trying to restrict my solution to Apache Flink 1.10 because its 
the current version supported by Amazon EMR. 
i am not ready to change our operational environment to solve this. 

Second, I am using the DataStream API. The Kafka Topic is not in a table, it is 
in a DataStream. 

The SQL queries are literally from a PostgresSQL database, and only need to be 
run exactly once in the lifetime of the job. 

I am struggling to determine where this happens. 

JDBCInputFormat seems to query the SQL table repetitively, and also connecting 
streams and aggregating into one object is very complicated. 

Thus, I am wondering what is the right approach. 

Let me restate the parameters. 

SQL Query One = data in PostgreSQL (200K records) that is used for business 
logic. 
SQL Query Two = data in PostgreSQL (1000 records) that is used for business 
logic. 
Kafka Topic One = unlimited data-stream that uses the data-stream api and 
queries above to write into multiple sinks 

Asci Diagram: 

[ SQL Query One] > [Aggregate to Map] 

Kafka > [Kafka Topic One] --- [Keyed Process Function (Query One Map, Query 
Two Map)] ---<[Multiple Sinks] 

[ SQL Query Two] > [Aggregate to Map] 


Maybe my graph above helps. You see, I need Query One and Query Two only ever 
execute once. After that the information they provide are used to correctly 
process the Kafka Topic. 

I'll take a deep further to try and understand what you said, thank you, but 
JDBCInputFormat seem to repetitively query the database. Maybe I need to write 
a RichFunction or AsyncIO function and cache the results in state after that. 






On Aug 4, 2020, at 6:25 PM, Leonard Xu < [ mailto:xbjt...@gmail.com | 
xbjt...@gmail.com ] > wrote: 

Hi, Marco 


BQ_BEGIN

If I need SQL Query One and SQL Query Two to happen just one time, 



Looks like you want to reuse this kafka table in one job, It’s supported to 
execute multiple query in one sql job in Flink 1.11. 
You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a 
single SQL job[1]. 


Best 
Leonard 
[1] [ 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
 | 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
 ] 



BQ_BEGIN

在 2020年8月5日,04:34,Marco Villalobos < [ mailto:mvillalo...@kineteque.com | 
mvillalo...@kineteque.com ] > 写道: 

Lets say that I have: 

SQL Query One from data in PostgreSQL (200K records). 
SQL Query Two from data in PostgreSQL (1000 records). 
and Kafka Topic One. 

Let's also say that main data from this Flink job arrives in Kafka Topic One. 

If I need SQL Query One and SQL Query Two to happen just one time, when the job 
starts up, and afterwards maybe store it in Keyed State or Broadcast State, but 
it's not really part of the stream, then what is the best practice for 
supporting that in Flink 

The Flink job needs to stream data from Kafka Topic One, aggregate it, and 
perform computations that require all of the data in SQL Query One and SQL 
Query Two to perform its business logic. 

I am using Flink 1.10. 

I supposed to query the database before the Job I submitted, and then pass it 
on as parameters to a function? 
Or am I supposed to use JDBCInputFormat for both queries and create two 
streams, and somehow connect or broadcast both of them two the main stream that 
uses Kafka Topic One? 

I would appreciate guidance. Please. Thank you. 

Sincerely, 

Marco A. Villalobos 

BQ_END


BQ_END



Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Yang Wang
Actually, the application status shows in YARN web UI is not determined by
the jobmanager process exit code.
Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
control the final status of YARN application.
So although jobmanager exit with zero code, it still could show failed
status in YARN web UI.

I have created a ticket to track this improvement[1].

[1]. https://issues.apache.org/jira/browse/FLINK-18828


Best,
Yang


Till Rohrmann  于2020年8月5日周三 下午3:56写道:

> Yes for the other deployments it is not a problem. A reason why people
> preferred non-zero exit codes in case of FAILED jobs is that this is easier
> to monitor than having to take a look at the actual job result. Moreover,
> in the YARN web UI the application shows as failed if I am not mistaken.
> However, from a framework's perspective, a FAILED job does not mean that
> Flink has failed and, hence, the return code could still be 0 in my opinion.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>
>> Hi Eleanore,
>>
>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>> jobmanager one time and finish after a successful/failed completion.
>>
>> However, using Job still could not solve your problem completely. Just as
>> Till said, When a job exhausts the restart strategy, the jobmanager
>> pod will terminate with non-zero exit code. It will cause the K8s
>> restarting it again. Even though we could set the resartPolicy and
>> backoffLimit,
>> this is not a clean and correct way to go. We should terminate the
>> jobmanager process with zero exit code in such situation.
>>
>> @Till Rohrmann  I just have one concern. Is it a
>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>> terminating with
>> non-zero exit code is harmless.
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>
>>> Hi Yang & Till,
>>>
>>> Thanks for your prompt reply!
>>>
>>> Yang, regarding your question, I am actually not using k8s job, as I put
>>> my app.jar and its dependencies under flink's lib directory. I have 1 k8s
>>> deployment for job manager, and 1 k8s deployment for task manager, and 1
>>> k8s service for job manager.
>>>
>>> As you mentioned above, if flink job is marked as failed, it will cause
>>> the job manager pod to be restarted. Which is not the ideal behavior.
>>>
>>> Do you suggest that I should change the deployment strategy from using
>>> k8s deployment to k8s job? In case the flink program exit with non-zero
>>> code (e.g. exhausted number of configured restart), pod can be marked as
>>> complete hence not restarting the job again?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:
>>>
 @Till Rohrmann  In native mode, when a Flink
 application terminates with FAILED state, all the resources will be cleaned
 up.

 However, in standalone mode, I agree with you that we need to rethink
 the exit code of Flink. When a job exhausts the restart
 strategy, we should terminate the pod and do not restart again. After
 googling, it seems that we could not specify the restartPolicy
 based on exit code[1]. So maybe we need to return a zero exit code to
 avoid restarting by K8s.

 [1].
 https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code

 Best,
 Yang

 Till Rohrmann  于2020年8月4日周二 下午3:48写道:

> @Yang Wang  I believe that we should
> rethink the exit codes of Flink. In general you want K8s to restart a
> failed Flink process. Hence, an application which terminates in state
> FAILED should not return a non-zero exit code because it is a valid
> termination state.
>
> Cheers,
> Till
>
> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
> wrote:
>
>> Hi Eleanore,
>>
>> I think you are using K8s resource "Job" to deploy the jobmanager.
>> Please set .spec.template.spec.restartPolicy = "Never" and
>> spec.backoffLimit = 0.
>> Refer here[1] for more information.
>>
>> Then, when the jobmanager failed because of any reason, the K8s job
>> will be marked failed. And K8s will not restart the job again.
>>
>> [1].
>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>
>>> Hi Till,
>>>
>>> Thanks for the reply!
>>>
>>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>>> Specifically, I build a custom docker image, which I copied the app jar
>>> (not uber jar) and all its dependencies under /flink/lib.
>>>
>>> So my question is more like, in this case, if the job is marked as
>>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>>> what are the suggestions for such scenario?
>>>
>>> Tha

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Till Rohrmann
You are right Yang Wang.

Thanks for creating this issue.

Cheers,
Till

On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:

> Actually, the application status shows in YARN web UI is not determined by
> the jobmanager process exit code.
> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
> control the final status of YARN application.
> So although jobmanager exit with zero code, it still could show failed
> status in YARN web UI.
>
> I have created a ticket to track this improvement[1].
>
> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>
>> Yes for the other deployments it is not a problem. A reason why people
>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>> to monitor than having to take a look at the actual job result. Moreover,
>> in the YARN web UI the application shows as failed if I am not mistaken.
>> However, from a framework's perspective, a FAILED job does not mean that
>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>
>>> Hi Eleanore,
>>>
>>> Yes, I suggest to use Job to replace Deployment. It could be used to run
>>> jobmanager one time and finish after a successful/failed completion.
>>>
>>> However, using Job still could not solve your problem completely. Just
>>> as Till said, When a job exhausts the restart strategy, the jobmanager
>>> pod will terminate with non-zero exit code. It will cause the K8s
>>> restarting it again. Even though we could set the resartPolicy and
>>> backoffLimit,
>>> this is not a clean and correct way to go. We should terminate the
>>> jobmanager process with zero exit code in such situation.
>>>
>>> @Till Rohrmann  I just have one concern. Is it a
>>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>>> terminating with
>>> non-zero exit code is harmless.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>>
 Hi Yang & Till,

 Thanks for your prompt reply!

 Yang, regarding your question, I am actually not using k8s job, as I
 put my app.jar and its dependencies under flink's lib directory. I have 1
 k8s deployment for job manager, and 1 k8s deployment for task manager, and
 1 k8s service for job manager.

 As you mentioned above, if flink job is marked as failed, it will cause
 the job manager pod to be restarted. Which is not the ideal behavior.

 Do you suggest that I should change the deployment strategy from using
 k8s deployment to k8s job? In case the flink program exit with non-zero
 code (e.g. exhausted number of configured restart), pod can be marked as
 complete hence not restarting the job again?

 Thanks a lot!
 Eleanore

 On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:

> @Till Rohrmann  In native mode, when a Flink
> application terminates with FAILED state, all the resources will be 
> cleaned
> up.
>
> However, in standalone mode, I agree with you that we need to rethink
> the exit code of Flink. When a job exhausts the restart
> strategy, we should terminate the pod and do not restart again. After
> googling, it seems that we could not specify the restartPolicy
> based on exit code[1]. So maybe we need to return a zero exit code to
> avoid restarting by K8s.
>
> [1].
> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>
> Best,
> Yang
>
> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>
>> @Yang Wang  I believe that we should
>> rethink the exit codes of Flink. In general you want K8s to restart a
>> failed Flink process. Hence, an application which terminates in state
>> FAILED should not return a non-zero exit code because it is a valid
>> termination state.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> I think you are using K8s resource "Job" to deploy the jobmanager.
>>> Please set .spec.template.spec.restartPolicy = "Never" and
>>> spec.backoffLimit = 0.
>>> Refer here[1] for more information.
>>>
>>> Then, when the jobmanager failed because of any reason, the K8s job
>>> will be marked failed. And K8s will not restart the job again.
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>>
 Hi Till,

 Thanks for the reply!

 I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
 Specifically, I build a custom docker image, which I copied the app jar
 (not uber jar) and 

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 Thread Eleanore Jin
Hi Yang and Till,

Thanks a lot for the help! I have the similar question as Till mentioned,
if we do not fail Flink pods when the restart strategy is exhausted, it
might be hard to monitor such failures. Today I get alerts if the k8s pods
are restarted or in crash loop, but if this will no longer be the case, how
can we deal with the monitoring? In production, I have hundreds of small
flink jobs running (2-8 TM pods) doing stateless processing, it is really
hard for us to expose ingress for each JM rest endpoint to periodically
query the job status for each flink job.

Thanks a lot!
Eleanore

On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann  wrote:

> You are right Yang Wang.
>
> Thanks for creating this issue.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>
>> Actually, the application status shows in YARN web UI is not determined
>> by the jobmanager process exit code.
>> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
>> control the final status of YARN application.
>> So although jobmanager exit with zero code, it still could show failed
>> status in YARN web UI.
>>
>> I have created a ticket to track this improvement[1].
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>>
>>> Yes for the other deployments it is not a problem. A reason why people
>>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>>> to monitor than having to take a look at the actual job result. Moreover,
>>> in the YARN web UI the application shows as failed if I am not mistaken.
>>> However, from a framework's perspective, a FAILED job does not mean that
>>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>>
 Hi Eleanore,

 Yes, I suggest to use Job to replace Deployment. It could be used
 to run jobmanager one time and finish after a successful/failed completion.

 However, using Job still could not solve your problem completely. Just
 as Till said, When a job exhausts the restart strategy, the jobmanager
 pod will terminate with non-zero exit code. It will cause the K8s
 restarting it again. Even though we could set the resartPolicy and
 backoffLimit,
 this is not a clean and correct way to go. We should terminate the
 jobmanager process with zero exit code in such situation.

 @Till Rohrmann  I just have one concern. Is it a
 special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
 terminating with
 non-zero exit code is harmless.


 Best,
 Yang

 Eleanore Jin  于2020年8月4日周二 下午11:54写道:

> Hi Yang & Till,
>
> Thanks for your prompt reply!
>
> Yang, regarding your question, I am actually not using k8s job, as I
> put my app.jar and its dependencies under flink's lib directory. I have 1
> k8s deployment for job manager, and 1 k8s deployment for task manager, and
> 1 k8s service for job manager.
>
> As you mentioned above, if flink job is marked as failed, it will
> cause the job manager pod to be restarted. Which is not the ideal
> behavior.
>
> Do you suggest that I should change the deployment strategy from using
> k8s deployment to k8s job? In case the flink program exit with non-zero
> code (e.g. exhausted number of configured restart), pod can be marked as
> complete hence not restarting the job again?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang 
> wrote:
>
>> @Till Rohrmann  In native mode, when a Flink
>> application terminates with FAILED state, all the resources will be 
>> cleaned
>> up.
>>
>> However, in standalone mode, I agree with you that we need to rethink
>> the exit code of Flink. When a job exhausts the restart
>> strategy, we should terminate the pod and do not restart again. After
>> googling, it seems that we could not specify the restartPolicy
>> based on exit code[1]. So maybe we need to return a zero exit code to
>> avoid restarting by K8s.
>>
>> [1].
>> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>>
>>> @Yang Wang  I believe that we should
>>> rethink the exit codes of Flink. In general you want K8s to restart a
>>> failed Flink process. Hence, an application which terminates in state
>>> FAILED should not return a non-zero exit code because it is a valid
>>> termination state.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang 
>>> wrote:
>>>
 Hi Eleanore,

 I think you are using K8s resource "Job" to depl

Re: Kafka transaction error lead to data loss under end to end exact-once

2020-08-05 Thread Piotr Nowojski
Hi Lu,

In this case, as it looks from the quite fragmented log/error message that
you posted, the job has failed so Flink indeed detected some issue and that
probably means a data loss in Kafka (in such case you could probably
recover some lost records by reading with `read_uncommitted` mode from
Kafka, but that can leads to data duplication).

However a very similar error can be logged by Flink as WARN during
recovery. In that case it can mean either:
- data loss because of timeouts (keep in mind that kafka transactional
timeouts must cover: checkpoint interval + downtime during the failure +
time to restart and recover Flink job)
- transaction was already committed before, just before failure has happened

and there is unfortunately no way using Kafka API to distinguish those two
cases.

Piotrek


śr., 5 sie 2020 o 10:17 Khachatryan Roman 
napisał(a):

> Hi Lu,
>
> AFAIK, it's not going to be fixed. As you mentioned in the first email,
> Kafka should be configured so that it's transaction timeout is less than
> your max checkpoint duration.
>
> However, you should not only change transaction.timeout.ms in producer
> but also transaction.max.timeout.ms on your brokers.
> Please refer to
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats
>
> Regards,
> Roman
>
>
> On Wed, Aug 5, 2020 at 12:24 AM Lu Niu  wrote:
>
>> Hi, Khachatryan
>>
>> Thank you for the reply. Is that a problem that can be fixed? If so, is
>> the fix on roadmap? Thanks!
>>
>> Best
>> Lu
>>
>> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Lu,
>>>
>>> Yes, this error indicates data loss (unless there were no records in the
>>> transactions).
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu  wrote:
>>>
 Hi,

 We are using end to end exact-once flink + kafka and
 encountered belowing exception which usually came after checkpoint 
 failures:
 ```














 *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
 Producer attempted an operation with an old epoch. Either there is a newer
 producer with the same transactionalId, or the producer's transaction has
 been expired by the broker.2020-07-28 16:27:51,633 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job xxx
 (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
 FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
 org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
 java.util.concurrent.FutureTask.run(FutureTask.java:266) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)Caused by:
 org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
 failed, logging first encountered failure at
 org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
 at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
 more*
 ```
 We did some end to end tests and noticed whenever such a thing happens,
 there will be a data loss.

 Referring to several related questions, I understand I need to increase
 `transaction.timeout.ms`  because:
 ```
 *Semantic.EXACTLY_ONCE mode relies on the ability to commit
 transactions that were started before taking a checkpoint, after recovering
 from the said checkpoint. If the time between Flink application crash and
 completed restart is larger than Kafka’s transaction timeout there will be
 data loss (Kafka will automatically abort transactions that exceeded
 timeout time).*
 ```

 But I want to confirm with the community that:
 *Does an exception like this will always lead to data loss? *

 I asked because we get this exception sometimes even when the
 checkpoint succeeds.

 Setup:
 Flink 1.9.1

 Best
 Lu

>>>


Re: Status of a job when a kafka source dies

2020-08-05 Thread Piotr Nowojski
Hi Nick,

What Aljoscha was trying to say is that Flink is not trying to do any
magic. If `KafkaConsumer` - which is being used under the hood of
`FlinkKafkaConsumer` connector - throws an exception, this
exception bubbles up causing the job to failover. If the failure is handled
by the `KafkaConsumer` silently, that's what's happening. As we can in the
TM log that you attached, the latter seems to be happening - note that the
warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
that's not the code we (Flink developers) control.

If you want to change this behaviour, unless someone here on this mailing
list just happens to know the answer, the better place to ask such a
question on the Kafka mailing list. Maybe there is some way to configure
this.

And sorry I don't know much about neither the KafkaConsumer nor the
KafkaBrokers configuration :(

Piotrek

wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):

> Hi,
> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
> kafka and zookeeper on all broker nodes. On the flink side, I see the
> messages in the log ( data is obfuscated) . There are no error logs. The
> kafka consumer properties are
>
> 1. "bootstrap.servers"
>
> 2. "zookeeper.connect
>
> 3. "auto.offset.reset"
>
> 4. "group.id"
>
> 5."security.protocol"
>
>
> The flink consumer starts consuming data as soon as the kafka comes back
> up. So I want to know in what scenario/kafka consumer config will the job
> go to failed state after a finite number of restart attempts from
> checkpoint.
>
>
> TM log.
> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-5,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-6,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
>
> Best,
> Nick
>
> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Flink doesn't do any special failure-handling or retry logic, so it’s up
>> to how the KafkaConsumer is configured via properties. In general Flink
>> doesn’t try to be smart: when something fails an exception fill bubble
>> up that will fail this execution of the job. If checkpoints are enabled
>> this will trigger a restore, this is controlled by the restart strategy.
>> If that eventually gives up the job fill go to “FAILED” and stop.
>>
>> This is the relevant section of the docs:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>>
>> Best,
>> Aljoscha
>>
>> On 15.07.20 17:42, Nick Bendtner wrote:
>> > Hi guys,
>> > I want to know what is the default behavior of Kafka source when a kafka
>> > cluster goes down during streaming. Will the job status go to failing
>> or is
>> > the exception caught and there is a back off before the source tries to
>> > poll for more events ?
>> >
>> >
>> > Best,
>> > Nick.
>> >
>>
>>


Re: Status of a job when a kafka source dies

2020-08-05 Thread Nick Bendtner
+user group.

On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:

> Thanks Piotr but shouldn't this event be handled by the FlinkKafkaConsumer
> since the poll happens inside the FlinkKafkaConsumer. How can I catch this
> event in my code since I don't have control over the poll.
>
> Best,
> Nick.
>
> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
> wrote:
>
>> Hi Nick,
>>
>> What Aljoscha was trying to say is that Flink is not trying to do any
>> magic. If `KafkaConsumer` - which is being used under the hood of
>> `FlinkKafkaConsumer` connector - throws an exception, this
>> exception bubbles up causing the job to failover. If the failure is handled
>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>> TM log that you attached, the latter seems to be happening - note that the
>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>> that's not the code we (Flink developers) control.
>>
>> If you want to change this behaviour, unless someone here on this mailing
>> list just happens to know the answer, the better place to ask such a
>> question on the Kafka mailing list. Maybe there is some way to configure
>> this.
>>
>> And sorry I don't know much about neither the KafkaConsumer nor the
>> KafkaBrokers configuration :(
>>
>> Piotrek
>>
>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>
>>> Hi,
>>> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
>>> kafka and zookeeper on all broker nodes. On the flink side, I see the
>>> messages in the log ( data is obfuscated) . There are no error logs. The
>>> kafka consumer properties are
>>>
>>> 1. "bootstrap.servers"
>>>
>>> 2. "zookeeper.connect
>>>
>>> 3. "auto.offset.reset"
>>>
>>> 4. "group.id"
>>>
>>> 5."security.protocol"
>>>
>>>
>>> The flink consumer starts consuming data as soon as the kafka comes back
>>> up. So I want to know in what scenario/kafka consumer config will the job
>>> go to failed state after a finite number of restart attempts from
>>> checkpoint.
>>>
>>>
>>> TM log.
>>> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-5,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
>>> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
>>> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-6,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>>
>>> Best,
>>> Nick
>>>
>>> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
>>> wrote:
>>>
 Hi,

 Flink doesn't do any special failure-handling or retry logic, so it’s
 up
 to how the KafkaConsumer is configured via properties. In general Flink
 doesn’t try to be smart: when something fails an exception fill bubble
 up that will fail this execution of the job. If checkpoints are enabled
 this will trigger a restore, this is controlled by the restart
 strategy.
 If that eventually gives up the job fill go to “FAILED” and stop.

 This is the relevant section of the docs:

 https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html

 Best,
 Aljoscha

 On 15.07.20 17:42, Nick Bendtner wrote:
 > Hi guys,
 > I want to know what is the default behavior of Kafka source when a
 kafka
 > cluster goes down during streaming. Will the job status go to failing
 or is
 > the exception caught and there is a back off before the source tries
 to
 > poll for more events ?
 >
 >
 > Best,
 > Nick.
 >




Re: Status of a job when a kafka source dies

2020-08-05 Thread Piotr Nowojski
Hi Nick,

Could you elaborate more, what event and how would you like Flink to
handle? Is there some kind of Kafka's API that can be used to listen to
such kind of events? Becket, do you maybe know something about this?

As a side note Nick, can not you configure some timeouts [1] in the
KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But as I
wrote before, that would be more a question to Kafka guys.

Piotrek

[1] http://kafka.apache.org/20/documentation/

śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):

> +user group.
>
> On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:
>
>> Thanks Piotr but shouldn't this event be handled by the
>> FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
>> How can I catch this event in my code since I don't have control over the
>> poll.
>>
>> Best,
>> Nick.
>>
>> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Nick,
>>>
>>> What Aljoscha was trying to say is that Flink is not trying to do any
>>> magic. If `KafkaConsumer` - which is being used under the hood of
>>> `FlinkKafkaConsumer` connector - throws an exception, this
>>> exception bubbles up causing the job to failover. If the failure is handled
>>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>>> TM log that you attached, the latter seems to be happening - note that the
>>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>>> that's not the code we (Flink developers) control.
>>>
>>> If you want to change this behaviour, unless someone here on this
>>> mailing list just happens to know the answer, the better place to ask such
>>> a question on the Kafka mailing list. Maybe there is some way to configure
>>> this.
>>>
>>> And sorry I don't know much about neither the KafkaConsumer nor the
>>> KafkaBrokers configuration :(
>>>
>>> Piotrek
>>>
>>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>>
 Hi,
 I don't observe this behaviour though, we use flink 1.7.2 . I stopped
 kafka and zookeeper on all broker nodes. On the flink side, I see the
 messages in the log ( data is obfuscated) . There are no error logs. The
 kafka consumer properties are

 1. "bootstrap.servers"

 2. "zookeeper.connect

 3. "auto.offset.reset"

 4. "group.id"

 5."security.protocol"


 The flink consumer starts consuming data as soon as the kafka comes
 back up. So I want to know in what scenario/kafka consumer config will the
 job go to failed state after a finite number of restart attempts from
 checkpoint.


 TM log.
 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-5,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
 yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-4,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
 yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-4,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
 yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-6,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
 yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
 Broker may not be available.

 Best,
 Nick

 On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
 wrote:

> Hi,
>
> Flink doesn't do any special failure-handling or retry logic, so it’s
> up
> to how the KafkaConsumer is configured via properties. In general
> Flink
> doesn’t try to be smart: when something fails an exception fill bubble
> up that will fail this execution of the job. If checkpoints are
> enabled
> this will trigger a restore, this is controlled by the restart
> strategy.
> If that eventually gives up the job fill go to “FAILED” and stop.
>
> This is the relevant section of the docs:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>
> Best,
> Aljoscha
>
> On 15.07.20 17:42, Nick Bendtner wrote:
> > Hi guys,
> > I want to know what is the default behavior of Kafka source when a
> kafka
> > cluster goes down during streaming. Will the job status go to
> failing or is
> > the exception caught and there is a back off before the source tries
> to

Re: Handle idle kafka source in Flink 1.9

2020-08-05 Thread bat man
Hello Arvid,

Thanks for the suggestion/reference and my apologies for the late reply.

With this I am able to process the data with some topics not having regular
data. Obviously, late data is being handheld as in side-output and has a
process for it.
One challenge is to handle the back-fill as when I run the job with old
data because of watermark(taking into account maxOutOfOrderness is set to
10 minutes) the older data gets filtered as late data. For handling this I
am thinking of running the side-input with maxOutOfOrderness to the oldest
data, regular job to be ok with normal setting.

Thanks,
Hemant

On Thu, Jul 30, 2020 at 2:41 PM Arvid Heise  wrote:

> Hi Hemant,
>
> sorry for the late reply.
>
> You can just create your own watermark assigner and either copy the
> assigner from Flink 1.11 or take the one that we use in our trainings [1].
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
>
> On Thu, Jul 23, 2020 at 8:48 PM bat man  wrote:
>
>> Thanks Niels for a great talk. You have covered two of my pain areas -
>> slim and broken streams. Since I am dealing with device data from on-prem
>> data centers. The first option of generating fabricated watermark events is
>> fine, however as mentioned in your talk how are you handling forwarding it
>> to the next stream(next kafka topic) after enrichment. Have you got any
>> solution for this?
>>
>> -Hemant
>>
>> On Thu, Jul 23, 2020 at 12:05 PM Niels Basjes  wrote:
>>
>>> Have a look at this presentation I gave a few weeks ago.
>>> https://youtu.be/bQmz7JOmE_4
>>>
>>> Niels Basjes
>>>
>>> On Wed, 22 Jul 2020, 08:51 bat man,  wrote:
>>>
 Hi Team,

 Can someone share their experiences handling this.

 Thanks.

 On Tue, Jul 21, 2020 at 11:30 AM bat man  wrote:

> Hello,
>
> I have a pipeline which consumes data from a Kafka source. Since, the
> partitions are partitioned by device_id in case a group of devices is down
> some partitions will not get normal flow of data.
> I understand from documentation here[1] in flink 1.11 one can declare
> the source idle -
> WatermarkStrategy.>forBoundedOutOfOrderness(
> Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>
> How can I handle this in 1.9, since I am using aws emr and emr doesn't
> have any release with the latest flink version.
>
> One way I could think of is to trigger watermark generation every 10
> minutes or so using Periodic watermarks. However, this will not be full
> proof, are there any better way to handle this more dynamically.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>
> Thanks,
> Hemant
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Two Queries and a Kafka Topic

2020-08-05 Thread Marco Villalobos
Hi Theo,

Thank you.

I just read the State Processor API in an effort to understand Option 1, it 
seems though I can just use a KeyedProcessFunction that loads the data just 
once (maybe on the "open" method), and serialize the values into MapState and 
use it from that point on.

Another option in documentation are CheckpointedFunction types, which were not 
clear in the documentation to me on how to use.

My data shares a common key, so this might be doable in KeyedProcessFunction.

Is that what you're suggesting?

Again, Thank you.

Marco A, Villalobos


> On Aug 5, 2020, at 3:52 AM, Theo Diefenthal 
>  wrote:
> 
> Hi Marco,
> 
> In general, I see three solutions here you could approach: 
> 
> 1. Use the StateProcessorAPI: You can run a program with the 
> stateProcessorAPI that loads the data from JDBC and stores it into a Flink 
> SavePoint. Afterwards, you start your streaming job from that savepoint which 
> will load its state and within find all the data from JDBC stored already. 
> 2. Load from master, distribute with the job: When you build up your 
> jobgraph, you could execute the JDBC queries and put the result into some 
> Serializable class which in turn you plug in a an operator in your stream 
> (e.g. a map stage). The class along with all the queried data will be 
> serialized and deserialized on the taskmanagers (Usually, I use this for 
> configuration parameters, but it might be ok in this case as well)
> 3. Load from TaskManager: In your map-function, if the very first event is 
> received, you can block processing and synchronously load the data from JDBC 
> (So each Taskmanager performs the JDBC query itself). You then keep the data 
> to be used for all subsequent map steps. 
> 
> I think, option 3 is the easiest to be implemented while option 1 might be 
> the most elegant way in my opinion. 
> 
> Best regards
> Theo
> 
> Von: "Marco Villalobos" 
> An: "Leonard Xu" 
> CC: "user" 
> Gesendet: Mittwoch, 5. August 2020 04:33:23
> Betreff: Re: Two Queries and a Kafka Topic
> 
> Hi Leonard,
> 
> First, Thank you.
> 
> I am currently trying to restrict my solution to Apache Flink 1.10 because 
> its the current version supported by Amazon EMR.
> i am not ready to change our operational environment to solve this.
> 
> Second, I am using the DataStream API.  The Kafka Topic is not in a table, it 
> is in a DataStream.
> 
> The SQL queries are literally from a PostgresSQL database, and only need to 
> be run exactly once in the lifetime of the job.
> 
> I am struggling to determine where this happens.
> 
> JDBCInputFormat seems to query the SQL table repetitively, and also 
> connecting streams and aggregating into one object is very complicated.
> 
> Thus, I am wondering what is the right approach.  
> 
> Let me restate the parameters.
> 
> SQL Query One = data in PostgreSQL (200K records) that is used for business 
> logic.
> SQL Query Two = data in PostgreSQL (1000 records) that is used for business 
> logic.
> Kafka Topic One = unlimited data-stream that uses the data-stream api and 
> queries above to write into multiple sinks
> 
> Asci Diagram:
> 
> [SQL Query One] > [Aggregate to Map]  
> 
> 
>   Kafka 
> > [Kafka Topic One]  --- [Keyed Process Function (Query One Map, Query 
> Two Map)] ---<[Multiple Sinks] 
> 
> [SQL Query Two] >[Aggregate to Map]
> 
> 
> Maybe my graph above helps.  You see, I need Query One and Query Two only 
> ever execute once.  After that the information they provide are used to 
> correctly process the Kafka Topic.
> 
> I'll take a deep further to try and understand what you said, thank you, but 
> JDBCInputFormat seem to repetitively query the database.  Maybe I need to 
> write a RichFunction or AsyncIO function and cache the results in state after 
> that.
> 
> 
> 
> On Aug 4, 2020, at 6:25 PM, Leonard Xu  > wrote:
> 
> Hi, Marco
> 
> If I need SQL Query One and SQL Query Two to happen just one time,
> 
> Looks like you want to reuse this kafka table in one job, It’s supported to 
> execute multiple query in one sql job in Flink 1.11. 
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a 
> single SQL job[1].
> 
> 
> Best
> Leonard 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>  
> 
> 
> 
> 在 2020年8月5日,04:34,Marco Villalobos  > 写道:
> 
> Lets say that I have:
> 
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
> 
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
> 
> If I need SQL Query One and SQL Query Two to h

Re: Flink CPU load metrics in K8s

2020-08-05 Thread Bajaj, Abhinav
Thanks Roman for providing the details.

I also made more observations that has increased my confusion about this topic 😝
To ease the calculations, I deployed a test cluster this time providing 1 CPU 
in K8s(with docker) for all the taskmanager container.

When I check the taskmanager CPU load, the value is in the order of 
"0.002158428663932657".
Assuming that the underlying JVM recognizes 1 CPU allocated to the docker 
container, this values means % CPU usage in ball park of 0.21%.

However, if I look at the K8s metrics(formula below) for this container – it 
turns out in the ball park of 10-16%.
There is no other process running in the container apart from the flink 
taskmanager.

The order of these two values of CPU % usage is different.

Am I comparing the right metrics here?
How are folks running Flink on K8s monitoring the CPU load?

~ Abhi

% CPU usage from K8s metrics
sum(rate(container_cpu_usage_seconds_total{pod=~"my-taskmanagers-*", 
container="taskmanager"}[5m])) by (pod)
/ sum(container_spec_cpu_quota{pod=~"my-taskmanager-pod-*", 
container="taskmanager"}
/ container_spec_cpu_period{pod=~"my-taskmanager-pod-*", 
container="taskmanager"}) by (pod)

From: Roman Grebennikov 
Date: Tuesday, August 4, 2020 at 12:42 AM
To: "user@flink.apache.org" 
Subject: Re: Flink CPU load metrics in K8s

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi,

JVM.CPU.Load is just a wrapper (MetricUtils.instantiateCPUMetrics) on top of 
OperatingSystemMXBean.getProcessCpuLoad (see 
https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad())

Usually it looks weird if you have multiple CPU cores. For example, if you have 
a job with a single slot 100% utilizing a single CPU core on a 8 core machine, 
the JVM.CPU.Load will be 1.0/8.0 = 0.125. It's also a point-in-time snapshot of 
current CPU usage, so if you're collecting your metrics every minute, and the 
job has spiky workload within this minute (like it's idle almost always and 
once in a minute it consumes 100% CPU for one second), so you have a chance to 
completely miss this from the metrics.

As for me personally, JVM.CPU.Time is more clear indicator of CPU usage, which 
is always increasing amount of milliseconds CPU spent executing your code. And 
it will also catch CPU usage spikes.

Roman Grebennikov | g...@dfdx.me


On Mon, Aug 3, 2020, at 23:34, Bajaj, Abhinav wrote:

Hi,



I am trying to understand the CPU Load metrics reported by Flink 1.7.1 running 
with openjdk 1.8.0_212 on K8s.



After deploying the Flink Job on K8s, I tried to get CPU Load metrics following 
this 
documentation.

curl 
localhost:8081/taskmanagers/7737ac33b311ea0a696422680711597b/metrics?get=Status.JVM.CPU.Load,Status.JVM.CPU.Time

[{"id":"Status.JVM.CPU.Load","value":"0.0023815194093831865"},{"id":"Status.JVM.CPU.Time","value":"2326000"}]



The value of the CPU load looks odd to me.



What is the unit and scale of this value?

How does Flink determine this value?



Appreciate your time and help here.

~ Abhinav Bajaj





Flink Promethues Metricsreporter Question

2020-08-05 Thread Avijit Saha
Hi,

Have a general question about Flink support for Prometheus metrics. We
already have a Prometheus setup in our cluster with ServiceMonitor-s
monitoring ports like 8080 etc. for scraping metrics.

In a setup like this, if we deploy Flink Job managers/Task managers in the
cluster, is there any need to have the PrometheusReporter configured as
well? How does that coordinate with existing Prometheus ServiceMonitors if
present?

Is the  PrometheusReporter based on "pull" model so that it can pull
metrics from Flink and send to some Prometheus host system?

Thanks
Avijit


Re: Issue with single job yarn flink cluster HA

2020-08-05 Thread Ken Krugler
Hi Dinesh,

Did updating to Flink 1.10 resolve the issue?

Thanks,

— Ken

> Hi Andrey,
> Sure We will try to use Flink 1.10 to see if HA issues we are facing is fixed 
> and update in this thread.
> 
> Thanks,
> Dinesh
> 
> On Thu, Apr 2, 2020 at 3:22 PM Andrey Zagrebin  > wrote:
> Hi Dinesh,
> 
> Thanks for sharing the logs. There were couple of HA fixes since 1.7, e.g. 
> [1] and [2].
> I would suggest to try Flink 1.10.
> If the problem persists, could you also find the logs of the failed Job 
> Manager before the failover?
> 
> Best,
> Andrey
> 
> [1] https://jira.apache.org/jira/browse/FLINK-14316 
> 
> [2] https://jira.apache.org/jira/browse/FLINK-11843 
> 
> On Tue, Mar 31, 2020 at 6:49 AM Dinesh J  > wrote:
> Hi Yang,
> I am attaching one full jobmanager log for a job which I reran today. This a 
> job that tries to read from savepoint.
> Same error message "leader election onging" is displayed. And this stays the 
> same even after 30 minutes. If I leave the job without yarn kill, it stays 
> the same forever.
> Based on your suggestions till now, I guess it might be some zookeeper 
> problem. If that is the case, what can I lookout for in zookeeper to figure 
> out the issue?
> 
> Thanks,
> Dinesh


[snip]

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread jincheng sun
Hi David, Thank you for sharing the problems with the current document, and
I agree with you as I also got the same feedback from Chinese users. I am
often contacted by users to ask questions such as whether PyFlink supports
"Java UDF" and whether PyFlink supports "xxxConnector". The root cause of
these problems is that our existing documents are based on Java users (text
and API mixed part). Since Python is newly added from 1.9, many document
information is not friendly to Python users. They don't want to look for
Python content in unfamiliar Java documents. Just yesterday, there were
complaints from Chinese users about where is all the document entries of
 Python API. So, have a centralized entry and clear document structure,
which is the urgent demand of Python users. The original intention of FLIP
is do our best to solve these user pain points.

Hi Xingbo and Wei Thank you for sharing PySpark's status on document
optimization. You're right. PySpark already has a lot of Python user
groups. They also find that Python user community is an important position
for multilingual support. The centralization and unification of Python
document content will reduce the learning cost of Python users, and good
document structure and content will also reduce the Q & A burden of the
community, It's a once and for all job.

Hi Seth, I wonder if your concerns have been resolved through the previous
discussion?

Anyway, the principle of FLIP is that in python document should only
include Python specific content, instead of making a copy of the Java
content. And would be great to have you to join in the improvement for
PyFlink (Both PRs and Review PRs).

Best,
Jincheng


Wei Zhong  于2020年8月5日周三 下午5:46写道:

> Hi Xingbo,
>
> Thanks for your information.
>
> I think the PySpark's documentation redesigning deserves our attention. It
> seems that the Spark community has also begun to treat the user experience
> of Python documentation more seriously. We can continue to pay attention to
> the discussion and progress of the redesigning in the Spark community. It
> is so similar to our working that there should be some ideas worthy for us.
>
> Best,
> Wei
>
>
> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>
> Hi,
>
> I found that the spark community is also working on redesigning pyspark
> documentation[1] recently. Maybe we can compare the difference between our
> document structure and its document structure.
>
> [1] https://issues.apache.org/jira/browse/SPARK-31851
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>
> Best,
> Xingbo
>
> David Anderson  于2020年8月5日周三 上午3:17写道:
>
>> I'm delighted to see energy going into improving the documentation.
>>
>> With the current documentation, I get a lot of questions that I believe
>> reflect two fundamental problems with what we currently provide:
>>
>> (1) We have a lot of contextual information in our heads about how Flink
>> works, and we are able to use that knowledge to make reasonable inferences
>> about how things (probably) work in cases we aren't so familiar with. For
>> example, I get a lot of questions of the form "If I use  will
>> I still have exactly once guarantees?" The answer is always yes, but they
>> continue to have doubts because we have failed to clearly communicate this
>> fundamental, underlying principle.
>>
>> This specific example about fault tolerance applies across all of the
>> Flink docs, but the general idea can also be applied to the Table/SQL and
>> PyFlink docs. The guiding principles underlying these APIs should be
>> written down in one easy-to-find place.
>>
>> (2) The other kind of question I get a lot is "Can I do  with ?"
>> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
>> very difficult to answer because it is frequently the case that one has to
>> reason about why a given feature doesn't seem to appear in the
>> documentation. It could be that I'm looking in the wrong place, or it could
>> be that someone forgot to document something, or it could be that it can in
>> fact be done by applying a general mechanism in a specific way that I
>> haven't thought of -- as in this case, where one can use a JDBC sink from
>> Python if one thinks to use DDL.
>>
>> So I think it would be helpful to be explicit about both what is, and
>> what is not, supported in PyFlink. And to have some very clear organizing
>> principles in the documentation so that users can quickly learn where to
>> look for specific facts.
>>
>> Regards,
>> David
>>
>>
>> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
>> wrote:
>>
>>> Hi Seth and David,
>>>
>>> I'm very happy to have your reply and suggestions. I would like to share
>>> my thoughts here:
>>>
>>> The main motivation we want to refactor the PyFlink doc is that we want
>>> to make sure that the Python users could find all they want starting from
>>> the PyFlink documentation mainpage. That’s, the PyFlink documentation