Re:Flink Sql中Kafka-Source怎么获取Key值

2020-02-03 Thread Michael Ran
source从新 订阅,作为新字段加入 字段列表里面 可行不?  我们改了source,对binlog 进行处理,源头很多字段 都加入进去。下游让SQL 处理
在 2020-02-04 00:09:18,"wei0727wei"  写道:
>Flink Sql中Kafka-Source只看到了关于Value的Deserialize,要怎么获取对应的Key及meta(offset, topic, 
>partition)信息呢?


Re: Flink Dynamodb as sink

2020-02-03 Thread hemant singh
Thanks, I'll check it out.

On Tue, Feb 4, 2020 at 12:30 PM 容祖儿  wrote:

> you can customize a Sink function (implement SinkFunction) that's not so
> hard.
>
> regards.
>
> On Tue, Feb 4, 2020 at 2:38 PM hemant singh  wrote:
>
>> Hi All,
>>
>> I am using dynamodb as a sink for one of the metrics pipeline. Wanted to
>> understand if there are any existing connectors. I did searched and could
>> not find one. If none exists, has anyone implemented one and any hints on
>> that direction will help a lot.
>>
>> Thanks,
>> Hemant
>>
>


Re: Flink Dynamodb as sink

2020-02-03 Thread 容祖儿
 you can customize a Sink function (implement SinkFunction) that's not so
hard.

regards.

On Tue, Feb 4, 2020 at 2:38 PM hemant singh  wrote:

> Hi All,
>
> I am using dynamodb as a sink for one of the metrics pipeline. Wanted to
> understand if there are any existing connectors. I did searched and could
> not find one. If none exists, has anyone implemented one and any hints on
> that direction will help a lot.
>
> Thanks,
> Hemant
>


Flink Dynamodb as sink

2020-02-03 Thread hemant singh
Hi All,

I am using dynamodb as a sink for one of the metrics pipeline. Wanted to
understand if there are any existing connectors. I did searched and could
not find one. If none exists, has anyone implemented one and any hints on
that direction will help a lot.

Thanks,
Hemant


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Hequn Cheng
Hi Jincheng,

+1 for this proposal.
>From the perspective of users, I think it would nice to have PyFlink on
PyPI which makes it much easier to install PyFlink.

Best, Hequn

On Tue, Feb 4, 2020 at 1:09 PM Jeff Zhang  wrote:

> +1
>
>
> Xingbo Huang  于2020年2月4日周二 下午1:07写道:
>
>> Hi Jincheng,
>>
>> Thanks for driving this.
>> +1 for this proposal.
>>
>> Compared to building from source, downloading directly from PyPI will
>> greatly save the development cost of Python users.
>>
>> Best,
>> Xingbo
>>
>>
>>
>> Wei Zhong  于2020年2月4日周二 下午12:43写道:
>>
>>> Hi Jincheng,
>>>
>>> Thanks for bring up this discussion!
>>>
>>> +1 for this proposal. Building from source takes long time and requires
>>> a good network environment. Some users may not have such an environment.
>>> Uploading to PyPI will greatly improve the user experience.
>>>
>>> Best,
>>> Wei
>>>
>>> jincheng sun  于2020年2月4日周二 上午11:49写道:
>>>
 Hi folks,

 I am very happy to receive some user inquiries about the use of Flink
 Python API (PyFlink) recently. One of the more common questions is
 whether
 it is possible to install PyFlink without using source code build. The
 most
 convenient and natural way for users is to use `pip install
 apache-flink`.
 We originally planned to support the use of `pip install apache-flink`
 in
 Flink 1.10, but the reason for this decision was that when Flink 1.9 was
 released at August 22, 2019[1], Flink's PyPI account system was not
 ready.
 At present, our PyPI account is available at October 09, 2019 [2](Only
 PMC
 can access), So for the convenience of users I propose:

 - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
 - Update Flink 1.9 documentation to add support for `pip install`.

 As we all know, Flink 1.9.2 was just completed released at January 31,
 2020
 [3]. There is still at least 1 to 2 months before the release of 1.9.3,
 so
 my proposal is completely considered from the perspective of user
 convenience. Although the proposed work is not large, we have not set a
 precedent for independent release of the Flink Python API(PyFlink) in
 the
 previous release process. I hereby initiate the current discussion and
 look
 forward to your feedback!

 Best,
 Jincheng

 [1]

 https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
 [2]

 https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
 [3]

 https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E

>>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Hequn Cheng
Hi Jincheng,

+1 for this proposal.
>From the perspective of users, I think it would nice to have PyFlink on
PyPI which makes it much easier to install PyFlink.

Best, Hequn

On Tue, Feb 4, 2020 at 1:09 PM Jeff Zhang  wrote:

> +1
>
>
> Xingbo Huang  于2020年2月4日周二 下午1:07写道:
>
>> Hi Jincheng,
>>
>> Thanks for driving this.
>> +1 for this proposal.
>>
>> Compared to building from source, downloading directly from PyPI will
>> greatly save the development cost of Python users.
>>
>> Best,
>> Xingbo
>>
>>
>>
>> Wei Zhong  于2020年2月4日周二 下午12:43写道:
>>
>>> Hi Jincheng,
>>>
>>> Thanks for bring up this discussion!
>>>
>>> +1 for this proposal. Building from source takes long time and requires
>>> a good network environment. Some users may not have such an environment.
>>> Uploading to PyPI will greatly improve the user experience.
>>>
>>> Best,
>>> Wei
>>>
>>> jincheng sun  于2020年2月4日周二 上午11:49写道:
>>>
 Hi folks,

 I am very happy to receive some user inquiries about the use of Flink
 Python API (PyFlink) recently. One of the more common questions is
 whether
 it is possible to install PyFlink without using source code build. The
 most
 convenient and natural way for users is to use `pip install
 apache-flink`.
 We originally planned to support the use of `pip install apache-flink`
 in
 Flink 1.10, but the reason for this decision was that when Flink 1.9 was
 released at August 22, 2019[1], Flink's PyPI account system was not
 ready.
 At present, our PyPI account is available at October 09, 2019 [2](Only
 PMC
 can access), So for the convenience of users I propose:

 - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
 - Update Flink 1.9 documentation to add support for `pip install`.

 As we all know, Flink 1.9.2 was just completed released at January 31,
 2020
 [3]. There is still at least 1 to 2 months before the release of 1.9.3,
 so
 my proposal is completely considered from the perspective of user
 convenience. Although the proposed work is not large, we have not set a
 precedent for independent release of the Flink Python API(PyFlink) in
 the
 previous release process. I hereby initiate the current discussion and
 look
 forward to your feedback!

 Best,
 Jincheng

 [1]

 https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
 [2]

 https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
 [3]

 https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E

>>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Jeff Zhang
+1


Xingbo Huang  于2020年2月4日周二 下午1:07写道:

> Hi Jincheng,
>
> Thanks for driving this.
> +1 for this proposal.
>
> Compared to building from source, downloading directly from PyPI will
> greatly save the development cost of Python users.
>
> Best,
> Xingbo
>
>
>
> Wei Zhong  于2020年2月4日周二 下午12:43写道:
>
>> Hi Jincheng,
>>
>> Thanks for bring up this discussion!
>>
>> +1 for this proposal. Building from source takes long time and requires a
>> good network environment. Some users may not have such an environment.
>> Uploading to PyPI will greatly improve the user experience.
>>
>> Best,
>> Wei
>>
>> jincheng sun  于2020年2月4日周二 上午11:49写道:
>>
>>> Hi folks,
>>>
>>> I am very happy to receive some user inquiries about the use of Flink
>>> Python API (PyFlink) recently. One of the more common questions is
>>> whether
>>> it is possible to install PyFlink without using source code build. The
>>> most
>>> convenient and natural way for users is to use `pip install
>>> apache-flink`.
>>> We originally planned to support the use of `pip install apache-flink` in
>>> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
>>> released at August 22, 2019[1], Flink's PyPI account system was not
>>> ready.
>>> At present, our PyPI account is available at October 09, 2019 [2](Only
>>> PMC
>>> can access), So for the convenience of users I propose:
>>>
>>> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
>>> - Update Flink 1.9 documentation to add support for `pip install`.
>>>
>>> As we all know, Flink 1.9.2 was just completed released at January 31,
>>> 2020
>>> [3]. There is still at least 1 to 2 months before the release of 1.9.3,
>>> so
>>> my proposal is completely considered from the perspective of user
>>> convenience. Although the proposed work is not large, we have not set a
>>> precedent for independent release of the Flink Python API(PyFlink) in the
>>> previous release process. I hereby initiate the current discussion and
>>> look
>>> forward to your feedback!
>>>
>>> Best,
>>> Jincheng
>>>
>>> [1]
>>>
>>> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
>>> [2]
>>>
>>> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
>>> [3]
>>>
>>> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Jeff Zhang
+1


Xingbo Huang  于2020年2月4日周二 下午1:07写道:

> Hi Jincheng,
>
> Thanks for driving this.
> +1 for this proposal.
>
> Compared to building from source, downloading directly from PyPI will
> greatly save the development cost of Python users.
>
> Best,
> Xingbo
>
>
>
> Wei Zhong  于2020年2月4日周二 下午12:43写道:
>
>> Hi Jincheng,
>>
>> Thanks for bring up this discussion!
>>
>> +1 for this proposal. Building from source takes long time and requires a
>> good network environment. Some users may not have such an environment.
>> Uploading to PyPI will greatly improve the user experience.
>>
>> Best,
>> Wei
>>
>> jincheng sun  于2020年2月4日周二 上午11:49写道:
>>
>>> Hi folks,
>>>
>>> I am very happy to receive some user inquiries about the use of Flink
>>> Python API (PyFlink) recently. One of the more common questions is
>>> whether
>>> it is possible to install PyFlink without using source code build. The
>>> most
>>> convenient and natural way for users is to use `pip install
>>> apache-flink`.
>>> We originally planned to support the use of `pip install apache-flink` in
>>> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
>>> released at August 22, 2019[1], Flink's PyPI account system was not
>>> ready.
>>> At present, our PyPI account is available at October 09, 2019 [2](Only
>>> PMC
>>> can access), So for the convenience of users I propose:
>>>
>>> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
>>> - Update Flink 1.9 documentation to add support for `pip install`.
>>>
>>> As we all know, Flink 1.9.2 was just completed released at January 31,
>>> 2020
>>> [3]. There is still at least 1 to 2 months before the release of 1.9.3,
>>> so
>>> my proposal is completely considered from the perspective of user
>>> convenience. Although the proposed work is not large, we have not set a
>>> precedent for independent release of the Flink Python API(PyFlink) in the
>>> previous release process. I hereby initiate the current discussion and
>>> look
>>> forward to your feedback!
>>>
>>> Best,
>>> Jincheng
>>>
>>> [1]
>>>
>>> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
>>> [2]
>>>
>>> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
>>> [3]
>>>
>>> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Xingbo Huang
Hi Jincheng,

Thanks for driving this.
+1 for this proposal.

Compared to building from source, downloading directly from PyPI will
greatly save the development cost of Python users.

Best,
Xingbo



Wei Zhong  于2020年2月4日周二 下午12:43写道:

> Hi Jincheng,
>
> Thanks for bring up this discussion!
>
> +1 for this proposal. Building from source takes long time and requires a
> good network environment. Some users may not have such an environment.
> Uploading to PyPI will greatly improve the user experience.
>
> Best,
> Wei
>
> jincheng sun  于2020年2月4日周二 上午11:49写道:
>
>> Hi folks,
>>
>> I am very happy to receive some user inquiries about the use of Flink
>> Python API (PyFlink) recently. One of the more common questions is whether
>> it is possible to install PyFlink without using source code build. The
>> most
>> convenient and natural way for users is to use `pip install apache-flink`.
>> We originally planned to support the use of `pip install apache-flink` in
>> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
>> released at August 22, 2019[1], Flink's PyPI account system was not ready.
>> At present, our PyPI account is available at October 09, 2019 [2](Only PMC
>> can access), So for the convenience of users I propose:
>>
>> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
>> - Update Flink 1.9 documentation to add support for `pip install`.
>>
>> As we all know, Flink 1.9.2 was just completed released at January 31,
>> 2020
>> [3]. There is still at least 1 to 2 months before the release of 1.9.3, so
>> my proposal is completely considered from the perspective of user
>> convenience. Although the proposed work is not large, we have not set a
>> precedent for independent release of the Flink Python API(PyFlink) in the
>> previous release process. I hereby initiate the current discussion and
>> look
>> forward to your feedback!
>>
>> Best,
>> Jincheng
>>
>> [1]
>>
>> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
>> [2]
>>
>> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
>> [3]
>>
>> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>>
>


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Xingbo Huang
Hi Jincheng,

Thanks for driving this.
+1 for this proposal.

Compared to building from source, downloading directly from PyPI will
greatly save the development cost of Python users.

Best,
Xingbo



Wei Zhong  于2020年2月4日周二 下午12:43写道:

> Hi Jincheng,
>
> Thanks for bring up this discussion!
>
> +1 for this proposal. Building from source takes long time and requires a
> good network environment. Some users may not have such an environment.
> Uploading to PyPI will greatly improve the user experience.
>
> Best,
> Wei
>
> jincheng sun  于2020年2月4日周二 上午11:49写道:
>
>> Hi folks,
>>
>> I am very happy to receive some user inquiries about the use of Flink
>> Python API (PyFlink) recently. One of the more common questions is whether
>> it is possible to install PyFlink without using source code build. The
>> most
>> convenient and natural way for users is to use `pip install apache-flink`.
>> We originally planned to support the use of `pip install apache-flink` in
>> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
>> released at August 22, 2019[1], Flink's PyPI account system was not ready.
>> At present, our PyPI account is available at October 09, 2019 [2](Only PMC
>> can access), So for the convenience of users I propose:
>>
>> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
>> - Update Flink 1.9 documentation to add support for `pip install`.
>>
>> As we all know, Flink 1.9.2 was just completed released at January 31,
>> 2020
>> [3]. There is still at least 1 to 2 months before the release of 1.9.3, so
>> my proposal is completely considered from the perspective of user
>> convenience. Although the proposed work is not large, we have not set a
>> precedent for independent release of the Flink Python API(PyFlink) in the
>> previous release process. I hereby initiate the current discussion and
>> look
>> forward to your feedback!
>>
>> Best,
>> Jincheng
>>
>> [1]
>>
>> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
>> [2]
>>
>> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
>> [3]
>>
>> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>>
>


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Wei Zhong
Hi Jincheng,

Thanks for bring up this discussion!

+1 for this proposal. Building from source takes long time and requires a
good network environment. Some users may not have such an environment.
Uploading to PyPI will greatly improve the user experience.

Best,
Wei

jincheng sun  于2020年2月4日周二 上午11:49写道:

> Hi folks,
>
> I am very happy to receive some user inquiries about the use of Flink
> Python API (PyFlink) recently. One of the more common questions is whether
> it is possible to install PyFlink without using source code build. The most
> convenient and natural way for users is to use `pip install apache-flink`.
> We originally planned to support the use of `pip install apache-flink` in
> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
> released at August 22, 2019[1], Flink's PyPI account system was not ready.
> At present, our PyPI account is available at October 09, 2019 [2](Only PMC
> can access), So for the convenience of users I propose:
>
> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
> - Update Flink 1.9 documentation to add support for `pip install`.
>
> As we all know, Flink 1.9.2 was just completed released at January 31, 2020
> [3]. There is still at least 1 to 2 months before the release of 1.9.3, so
> my proposal is completely considered from the perspective of user
> convenience. Although the proposed work is not large, we have not set a
> precedent for independent release of the Flink Python API(PyFlink) in the
> previous release process. I hereby initiate the current discussion and look
> forward to your feedback!
>
> Best,
> Jincheng
>
> [1]
>
> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
> [3]
>
> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Wei Zhong
Hi Jincheng,

Thanks for bring up this discussion!

+1 for this proposal. Building from source takes long time and requires a
good network environment. Some users may not have such an environment.
Uploading to PyPI will greatly improve the user experience.

Best,
Wei

jincheng sun  于2020年2月4日周二 上午11:49写道:

> Hi folks,
>
> I am very happy to receive some user inquiries about the use of Flink
> Python API (PyFlink) recently. One of the more common questions is whether
> it is possible to install PyFlink without using source code build. The most
> convenient and natural way for users is to use `pip install apache-flink`.
> We originally planned to support the use of `pip install apache-flink` in
> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
> released at August 22, 2019[1], Flink's PyPI account system was not ready.
> At present, our PyPI account is available at October 09, 2019 [2](Only PMC
> can access), So for the convenience of users I propose:
>
> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
> - Update Flink 1.9 documentation to add support for `pip install`.
>
> As we all know, Flink 1.9.2 was just completed released at January 31, 2020
> [3]. There is still at least 1 to 2 months before the release of 1.9.3, so
> my proposal is completely considered from the perspective of user
> convenience. Although the proposed work is not large, we have not set a
> precedent for independent release of the Flink Python API(PyFlink) in the
> previous release process. I hereby initiate the current discussion and look
> forward to your feedback!
>
> Best,
> Jincheng
>
> [1]
>
> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
> [3]
>
> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>


Re: Flink build-in functions

2020-02-03 Thread Jingsong Li
Hi Sunfulin,

Did you use blink-planner? What functions are missing?

Best,
Jingsong Lee

On Tue, Feb 4, 2020 at 12:23 PM Wyatt Chun  wrote:

> They are two different systems for differentiated usage. For your
> question, why don’t give a direct try on Blink?
>
> Regards
>
> On Tue, Feb 4, 2020 at 10:02 AM sunfulin  wrote:
>
>> As far as I can see, the latest flink version does not have a fullfilled
>> support for blink build-in functions. Many date functions and string
>> functions can not be used in Flink. I want to know that when shall we use
>> flink just as to use blink in the same way.
>>
>>
>>
>>
>

-- 
Best, Jingsong Lee


Re: Flink build-in functions

2020-02-03 Thread Wyatt Chun
They are two different systems for differentiated usage. For your question,
why don’t give a direct try on Blink?

Regards

On Tue, Feb 4, 2020 at 10:02 AM sunfulin  wrote:

> As far as I can see, the latest flink version does not have a fullfilled
> support for blink build-in functions. Many date functions and string
> functions can not be used in Flink. I want to know that when shall we use
> flink just as to use blink in the same way.
>
>
>
>


[DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread jincheng sun
Hi folks,

I am very happy to receive some user inquiries about the use of Flink
Python API (PyFlink) recently. One of the more common questions is whether
it is possible to install PyFlink without using source code build. The most
convenient and natural way for users is to use `pip install apache-flink`.
We originally planned to support the use of `pip install apache-flink` in
Flink 1.10, but the reason for this decision was that when Flink 1.9 was
released at August 22, 2019[1], Flink's PyPI account system was not ready.
At present, our PyPI account is available at October 09, 2019 [2](Only PMC
can access), So for the convenience of users I propose:

- Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
- Update Flink 1.9 documentation to add support for `pip install`.

As we all know, Flink 1.9.2 was just completed released at January 31, 2020
[3]. There is still at least 1 to 2 months before the release of 1.9.3, so
my proposal is completely considered from the perspective of user
convenience. Although the proposed work is not large, we have not set a
precedent for independent release of the Flink Python API(PyFlink) in the
previous release process. I hereby initiate the current discussion and look
forward to your feedback!

Best,
Jincheng

[1]
https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E


Flink build-in functions

2020-02-03 Thread sunfulin
As far as I can see, the latest flink version does not have a fullfilled 
support for blink build-in functions. Many date functions and string functions 
can not be used in Flink. I want to know that when shall we use flink just as 
to use blink in the same way.  

Re: Flink Sql中Kafka-Source怎么获取Key值

2020-02-03 Thread Jark Wu
Hi,

目前 Flink SQL 还不支持访问 key 及 meta,不过这功能已经在计划中了,有望在1.11 中支持。
我建了个 issue 来追踪这个需求:https://issues.apache.org/jira/browse/FLINK-15869

Best,
Jark

On Tue, 4 Feb 2020 at 00:10, wei0727wei  wrote:

> Flink Sql中Kafka-Source只看到了关于Value的Deserialize,要怎么获取对应的Key及meta(offset,
> topic, partition)信息呢?


Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Mark Harris
Hi Kostas,

Thanks for your help here - I think we're OK with the increased heap size, but 
happy to explore other alternatives.

I see the problem - we're currently using a BulkFormat, which doesn't seem to 
let us override the rolling policy. Is there an equivalent for the BulkFormat?

Best regards,

Mark

From: Kostas Kloudas 
Sent: 03 February 2020 15:39
To: Mark Harris 
Cc: Piotr Nowojski ; Cliff Resnick ; 
David Magalhães ; Till Rohrmann ; 
flink-u...@apache.org 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi Mark,

You can use something like the following and change the intervals accordingly:

final StreamingFileSink sink = StreamingFileSink
  .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder<>("UTF-8"))
   .withRollingPolicy(
   DefaultRollingPolicy.builder()
  
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
  
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
  
.withMaxPartSize(1024 * 1024 * 1024)
  .build()
  ) .build();

Let me know if this solves the problem.

Cheers,
Kostas

On Mon, Feb 3, 2020 at 4:11 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi Kostas,

Sorry, stupid question: How do I set that for a StreamingFileSink?

Best regards,

Mark

From: Kostas Kloudas mailto:kklou...@apache.org>>
Sent: 03 February 2020 14:58
To: Mark Harris mailto:mark.har...@hivehome.com>>
Cc: Piotr Nowojski mailto:pi...@ververica.com>>; Cliff 
Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org 
mailto:flink-u...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi Mark,

Have you tried to set your rolling policy to close inactive part files after 
some time [1]?
If the part files in the buckets are inactive and there are no new part files, 
then the state handle for those buckets will also be removed.

Cheers,
Kostas

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html



On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi all,

The out-of-memory heap dump had the answer - the job was failing with an 
OutOfMemoryError because the activeBuckets members of 3 instances of 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were filling a 
significant enough part of the memory of the taskmanager that no progress was 
being made. Increasing the memory available to the TM seems to have fixed the 
problem.

I think the DeleteOnExit problem will mean it needs to be restarted every few 
weeks, but that's acceptable for now.

Thanks again,

Mark

From: Mark Harris mailto:mark.har...@hivehome.com>>
Sent: 30 January 2020 14:36
To: Piotr Nowojski mailto:pi...@ververica.com>>
Cc: Cliff Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

Thanks for your help with this. 

The EMR cluster has 3 15GB VMs, and the flink cluster is started with:

/usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3

Usually the task runs for about 15 minutes before it restarts, usually due to 
with an "java.lang.OutOfMemoryError: Java heap space" exception.

The figures came from a MemoryAnalyzer session on a manual memory dump from one 
of the taskmanagers. The total size of that heap was only 1.8gb.  In that heap, 
1.7gb is taken up by the static field "files" in DeleteOnExitHook, which is a 
linked hash set containing the 9 million strings.

A full example of one the path is 
/tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on to 
an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
contribute another 20MB or so.
I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
DeleteOnExitHook is responsible for 96.98% of the heap dump.

Looking at the files it managed to write before this started to happen 
regularly, it looks like they're being 

Flink Sql中Kafka-Source怎么获取Key值

2020-02-03 Thread wei0727wei
Flink Sql中Kafka-Source只看到了关于Value的Deserialize,要怎么获取对应的Key及meta(offset, topic, 
partition)信息呢?

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Kostas Kloudas
Hi Mark,

You can use something like the following and change the intervals
accordingly:

final StreamingFileSink sink = StreamingFileSink
  .forRowFormat(new Path(outputPath), new
SimpleStringEncoder<>("UTF-8"))
   .withRollingPolicy(
   DefaultRollingPolicy.builder()
  .
withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
  .
withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
  .
withMaxPartSize(1024 * 1024 * 1024)
  .build
()
  ) .build();

Let me know if this solves the problem.

Cheers,
Kostas

On Mon, Feb 3, 2020 at 4:11 PM Mark Harris  wrote:

> Hi Kostas,
>
> Sorry, stupid question: How do I set that for a StreamingFileSink?
>
> Best regards,
>
> Mark
> --
> *From:* Kostas Kloudas 
> *Sent:* 03 February 2020 14:58
> *To:* Mark Harris 
> *Cc:* Piotr Nowojski ; Cliff Resnick <
> cre...@gmail.com>; David Magalhães ; Till Rohrmann
> ; flink-u...@apache.org 
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi Mark,
>
> Have you tried to set your rolling policy to close inactive part files
> after some time [1]?
> If the part files in the buckets are inactive and there are no new part
> files, then the state handle for those buckets will also be removed.
>
> Cheers,
> Kostas
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html
>
>
>
> On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
> wrote:
>
> Hi all,
>
> The out-of-memory heap dump had the answer - the job was failing with an
> OutOfMemoryError because the activeBuckets members of 3 instances of
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were
> filling a significant enough part of the memory of the taskmanager that no
> progress was being made. Increasing the memory available to the TM seems to
> have fixed the problem.
>
> I think the DeleteOnExit problem will mean it needs to be restarted every
> few weeks, but that's acceptable for now.
>
> Thanks again,
>
> Mark
> --
> *From:* Mark Harris 
> *Sent:* 30 January 2020 14:36
> *To:* Piotr Nowojski 
> *Cc:* Cliff Resnick ; David Magalhães <
> speeddra...@gmail.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> Thanks for your help with this. 
>
> The EMR cluster has 3 15GB VMs, and the flink cluster is started with:
>
> /usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3
>
> Usually the task runs for about 15 minutes before it restarts, usually due
> to with an "java.lang.OutOfMemoryError: Java heap space" exception.
>
> The figures came from a MemoryAnalyzer session on a manual memory dump
> from one of the taskmanagers. The total size of that heap was only 1.8gb.
> In that heap, 1.7gb is taken up by the static field "files" in
> DeleteOnExitHook, which is a linked hash set containing the 9 million
> strings.
>
> A full example of one the path is
> /tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per
> char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and
> another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding
> on to an array of 16777216 HashMap$Node elements, the LinkedHashMap can
> only contribute another 20MB or so.
> I goofed in not taking that 85% figure from MemoryAnalyzer - it tells
> me DeleteOnExitHook is responsible for 96.98% of the heap dump.
>
> Looking at the files it managed to write before this started to happen
> regularly, it looks like they're being written approximately every 3
> minutes. I'll triple check our config, but I'm reasonably sure the job is
> configured to checkpoint every 15 minutes - could something else be causing
> it to write?
>
> This may all be a red herring - something else may be taking up the
> taskmanagers memory which didn't make it into that heap dump. I plan to
> repeat the analysis on a heapdump created
> by  -XX:+HeapDumpOnOutOfMemoryError shortly.
>
> Best regards,
>
> Mark
>
> --
> *From:* Piotr Nowojski 
> *Sent:* 30 January 2020 13:44
> *To:* Mark Harris 
> *Cc:* Cliff Resnick ; David Magalhães <
> speeddra...@gmail.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> What is your job setup? Size of the nodes, memory settings of the
> Flink/JVM?
>
> 9 041 060 strings 

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Mark Harris
Hi Kostas,

Sorry, stupid question: How do I set that for a StreamingFileSink?

Best regards,

Mark

From: Kostas Kloudas 
Sent: 03 February 2020 14:58
To: Mark Harris 
Cc: Piotr Nowojski ; Cliff Resnick ; 
David Magalhães ; Till Rohrmann ; 
flink-u...@apache.org 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi Mark,

Have you tried to set your rolling policy to close inactive part files after 
some time [1]?
If the part files in the buckets are inactive and there are no new part files, 
then the state handle for those buckets will also be removed.

Cheers,
Kostas

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html



On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi all,

The out-of-memory heap dump had the answer - the job was failing with an 
OutOfMemoryError because the activeBuckets members of 3 instances of 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were filling a 
significant enough part of the memory of the taskmanager that no progress was 
being made. Increasing the memory available to the TM seems to have fixed the 
problem.

I think the DeleteOnExit problem will mean it needs to be restarted every few 
weeks, but that's acceptable for now.

Thanks again,

Mark

From: Mark Harris mailto:mark.har...@hivehome.com>>
Sent: 30 January 2020 14:36
To: Piotr Nowojski mailto:pi...@ververica.com>>
Cc: Cliff Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

Thanks for your help with this. 

The EMR cluster has 3 15GB VMs, and the flink cluster is started with:

/usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3

Usually the task runs for about 15 minutes before it restarts, usually due to 
with an "java.lang.OutOfMemoryError: Java heap space" exception.

The figures came from a MemoryAnalyzer session on a manual memory dump from one 
of the taskmanagers. The total size of that heap was only 1.8gb.  In that heap, 
1.7gb is taken up by the static field "files" in DeleteOnExitHook, which is a 
linked hash set containing the 9 million strings.

A full example of one the path is 
/tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on to 
an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
contribute another 20MB or so.
I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
DeleteOnExitHook is responsible for 96.98% of the heap dump.

Looking at the files it managed to write before this started to happen 
regularly, it looks like they're being written approximately every 3 minutes. 
I'll triple check our config, but I'm reasonably sure the job is configured to 
checkpoint every 15 minutes - could something else be causing it to write?

This may all be a red herring - something else may be taking up the 
taskmanagers memory which didn't make it into that heap dump. I plan to repeat 
the analysis on a heapdump created by  -XX:+HeapDumpOnOutOfMemoryError shortly.

Best regards,

Mark


From: Piotr Nowojski mailto:pi...@ververica.com>>
Sent: 30 January 2020 13:44
To: Mark Harris mailto:mark.har...@hivehome.com>>
Cc: Cliff Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

What is your job setup? Size of the nodes, memory settings of the Flink/JVM?

9 041 060 strings is awfully small number to bring down a whole cluster. With 
each tmp string having ~30 bytes, that’s only 271MB. Is this really 85% of the 
heap? And also, with parallelism of 6 and checkpoints every 15 minutes, 9 000 
000 of leaked strings should happen only after one month  assuming 500-600 
total number of buckets. (Also assuming that there is a separate file per each 
bucket).

Piotrek

On 30 Jan 2020, at 14:21, Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:

Trying a few different approaches to the fs.s3a.fast.upload settings has bought 
me no joy - the taskmanagers end up simply crashing or complaining of high GC 
load. Heap dumps suggest that this time they're clogged with buffers instead, 
which makes sense.

Our 

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Kostas Kloudas
Hi Mark,

Have you tried to set your rolling policy to close inactive part files
after some time [1]?
If the part files in the buckets are inactive and there are no new part
files, then the state handle for those buckets will also be removed.

Cheers,
Kostas

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html



On Mon, Feb 3, 2020 at 3:54 PM Mark Harris  wrote:

> Hi all,
>
> The out-of-memory heap dump had the answer - the job was failing with an
> OutOfMemoryError because the activeBuckets members of 3 instances of
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were
> filling a significant enough part of the memory of the taskmanager that no
> progress was being made. Increasing the memory available to the TM seems to
> have fixed the problem.
>
> I think the DeleteOnExit problem will mean it needs to be restarted every
> few weeks, but that's acceptable for now.
>
> Thanks again,
>
> Mark
> --
> *From:* Mark Harris 
> *Sent:* 30 January 2020 14:36
> *To:* Piotr Nowojski 
> *Cc:* Cliff Resnick ; David Magalhães <
> speeddra...@gmail.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> Thanks for your help with this. 
>
> The EMR cluster has 3 15GB VMs, and the flink cluster is started with:
>
> /usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3
>
> Usually the task runs for about 15 minutes before it restarts, usually due
> to with an "java.lang.OutOfMemoryError: Java heap space" exception.
>
> The figures came from a MemoryAnalyzer session on a manual memory dump
> from one of the taskmanagers. The total size of that heap was only 1.8gb.
> In that heap, 1.7gb is taken up by the static field "files" in
> DeleteOnExitHook, which is a linked hash set containing the 9 million
> strings.
>
> A full example of one the path is
> /tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per
> char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and
> another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding
> on to an array of 16777216 HashMap$Node elements, the LinkedHashMap can
> only contribute another 20MB or so.
> I goofed in not taking that 85% figure from MemoryAnalyzer - it tells
> me DeleteOnExitHook is responsible for 96.98% of the heap dump.
>
> Looking at the files it managed to write before this started to happen
> regularly, it looks like they're being written approximately every 3
> minutes. I'll triple check our config, but I'm reasonably sure the job is
> configured to checkpoint every 15 minutes - could something else be causing
> it to write?
>
> This may all be a red herring - something else may be taking up the
> taskmanagers memory which didn't make it into that heap dump. I plan to
> repeat the analysis on a heapdump created
> by  -XX:+HeapDumpOnOutOfMemoryError shortly.
>
> Best regards,
>
> Mark
>
> --
> *From:* Piotr Nowojski 
> *Sent:* 30 January 2020 13:44
> *To:* Mark Harris 
> *Cc:* Cliff Resnick ; David Magalhães <
> speeddra...@gmail.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> What is your job setup? Size of the nodes, memory settings of the
> Flink/JVM?
>
> 9 041 060 strings is awfully small number to bring down a whole cluster.
> With each tmp string having ~30 bytes, that’s only 271MB. Is this really
> 85% of the heap? And also, with parallelism of 6 and checkpoints every 15
> minutes, 9 000 000 of leaked strings should happen only after one month
>  assuming 500-600 total number of buckets. (Also assuming that there is a
> separate file per each bucket).
>
> Piotrek
>
> On 30 Jan 2020, at 14:21, Mark Harris  wrote:
>
> Trying a few different approaches to the fs.s3a.fast.upload settings has
> bought me no joy - the taskmanagers end up simply crashing or complaining
> of high GC load. Heap dumps suggest that this time they're clogged with
> buffers instead, which makes sense.
>
> Our job has parallelism of 6 and checkpoints every 15 minutes - if
> anything, we'd like to increase the frequency of that checkpoint duration.
> I suspect this could be affected by the partition structure we were
> bucketing to as well, and at any given moment we could be receiving data
> for up to 280 buckets at once.
> Could this be a factor?
>
> Best regards,
>
> Mark
> --
> *From:* Piotr Nowojski 
> *Sent:* 27 January 2020 16:16
> *To:* Cliff Resnick 
> *Cc:* David Magalhães ; Mark Harris <
> mark.har...@hivehome.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory 

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Piotr Nowojski
Hi,

Thanks for getting back with the semi solution!

Sorry that I was not responding before - I was trying to figure this out with 
some of my colleagues.

> I think the DeleteOnExit problem will mean it needs to be restarted every few 
> weeks, but that's acceptable for now.

I hope by the time you find this annoying, Hadoop issue will be fixed somehow 
for you (AWS using Hadoop 3.3+?)

Piotrek

> On 3 Feb 2020, at 15:54, Mark Harris  wrote:
> 
> Hi all,
> 
> The out-of-memory heap dump had the answer - the job was failing with an 
> OutOfMemoryError because the activeBuckets members of 3 instances of 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were filling 
> a significant enough part of the memory of the taskmanager that no progress 
> was being made. Increasing the memory available to the TM seems to have fixed 
> the problem.
> 
> I think the DeleteOnExit problem will mean it needs to be restarted every few 
> weeks, but that's acceptable for now.
> 
> Thanks again,
> 
> Mark
> From: Mark Harris 
> Sent: 30 January 2020 14:36
> To: Piotr Nowojski 
> Cc: Cliff Resnick ; David Magalhães 
> ; Till Rohrmann ; 
> flink-u...@apache.org ; kkloudas 
> Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks 
> for S3a files
>  
> Hi,
> 
> Thanks for your help with this. 
> 
> The EMR cluster has 3 15GB VMs, and the flink cluster is started with:
> 
> /usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3
> 
> Usually the task runs for about 15 minutes before it restarts, usually due to 
> with an "java.lang.OutOfMemoryError: Java heap space" exception. 
> 
> The figures came from a MemoryAnalyzer session on a manual memory dump from 
> one of the taskmanagers. The total size of that heap was only 1.8gb.  In that 
> heap, 1.7gb is taken up by the static field "files" in DeleteOnExitHook, 
> which is a linked hash set containing the 9 million strings. 
> 
> A full example of one the path is 
> /tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
> char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
> another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on 
> to an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
> contribute another 20MB or so. 
> I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
> DeleteOnExitHook is responsible for 96.98% of the heap dump.
> 
> Looking at the files it managed to write before this started to happen 
> regularly, it looks like they're being written approximately every 3 minutes. 
> I'll triple check our config, but I'm reasonably sure the job is configured 
> to checkpoint every 15 minutes - could something else be causing it to write?
> 
> This may all be a red herring - something else may be taking up the 
> taskmanagers memory which didn't make it into that heap dump. I plan to 
> repeat the analysis on a heapdump created by  -XX:+HeapDumpOnOutOfMemoryError 
> shortly.
> 
> Best regards,
> 
> Mark
> 
> From: Piotr Nowojski 
> Sent: 30 January 2020 13:44
> To: Mark Harris 
> Cc: Cliff Resnick ; David Magalhães 
> ; Till Rohrmann ; 
> flink-u...@apache.org ; kkloudas 
> Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks 
> for S3a files
>  
> Hi,
> 
> What is your job setup? Size of the nodes, memory settings of the Flink/JVM?
> 
> 9 041 060 strings is awfully small number to bring down a whole cluster. With 
> each tmp string having ~30 bytes, that’s only 271MB. Is this really 85% of 
> the heap? And also, with parallelism of 6 and checkpoints every 15 minutes, 9 
> 000 000 of leaked strings should happen only after one month  assuming 
> 500-600 total number of buckets. (Also assuming that there is a separate file 
> per each bucket).
> 
> Piotrek 
> 
>> On 30 Jan 2020, at 14:21, Mark Harris > > wrote:
>> 
>> Trying a few different approaches to the fs.s3a.fast.upload settings has 
>> bought me no joy - the taskmanagers end up simply crashing or complaining of 
>> high GC load. Heap dumps suggest that this time they're clogged with buffers 
>> instead, which makes sense.
>> 
>> Our job has parallelism of 6 and checkpoints every 15 minutes - if anything, 
>> we'd like to increase the frequency of that checkpoint duration. I suspect 
>> this could be affected by the partition structure we were bucketing to as 
>> well, and at any given moment we could be receiving data for up to 280 
>> buckets at once.
>> Could this be a factor?
>> 
>> Best regards,
>> 
>> Mark
>> From: Piotr Nowojski mailto:pi...@ververica.com>>
>> Sent: 27 January 2020 16:16
>> To: Cliff Resnick mailto:cre...@gmail.com>>
>> Cc: David Magalhães mailto:speeddra...@gmail.com>>; 
>> Mark Harris mailto:mark.har...@hivehome.com>>; 
>> Till Rohrmann mailto:trohrm...@apache.org>>; 
>> flink-u...@apache.org  > >; kkloudas > 

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Mark Harris
Hi all,

The out-of-memory heap dump had the answer - the job was failing with an 
OutOfMemoryError because the activeBuckets members of 3 instances of 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were filling a 
significant enough part of the memory of the taskmanager that no progress was 
being made. Increasing the memory available to the TM seems to have fixed the 
problem.

I think the DeleteOnExit problem will mean it needs to be restarted every few 
weeks, but that's acceptable for now.

Thanks again,

Mark

From: Mark Harris 
Sent: 30 January 2020 14:36
To: Piotr Nowojski 
Cc: Cliff Resnick ; David Magalhães ; 
Till Rohrmann ; flink-u...@apache.org 
; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

Thanks for your help with this. 

The EMR cluster has 3 15GB VMs, and the flink cluster is started with:

/usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3

Usually the task runs for about 15 minutes before it restarts, usually due to 
with an "java.lang.OutOfMemoryError: Java heap space" exception.

The figures came from a MemoryAnalyzer session on a manual memory dump from one 
of the taskmanagers. The total size of that heap was only 1.8gb.  In that heap, 
1.7gb is taken up by the static field "files" in DeleteOnExitHook, which is a 
linked hash set containing the 9 million strings.

A full example of one the path is 
/tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on to 
an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
contribute another 20MB or so.
I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
DeleteOnExitHook is responsible for 96.98% of the heap dump.

Looking at the files it managed to write before this started to happen 
regularly, it looks like they're being written approximately every 3 minutes. 
I'll triple check our config, but I'm reasonably sure the job is configured to 
checkpoint every 15 minutes - could something else be causing it to write?

This may all be a red herring - something else may be taking up the 
taskmanagers memory which didn't make it into that heap dump. I plan to repeat 
the analysis on a heapdump created by  -XX:+HeapDumpOnOutOfMemoryError shortly.

Best regards,

Mark


From: Piotr Nowojski 
Sent: 30 January 2020 13:44
To: Mark Harris 
Cc: Cliff Resnick ; David Magalhães ; 
Till Rohrmann ; flink-u...@apache.org 
; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

What is your job setup? Size of the nodes, memory settings of the Flink/JVM?

9 041 060 strings is awfully small number to bring down a whole cluster. With 
each tmp string having ~30 bytes, that’s only 271MB. Is this really 85% of the 
heap? And also, with parallelism of 6 and checkpoints every 15 minutes, 9 000 
000 of leaked strings should happen only after one month  assuming 500-600 
total number of buckets. (Also assuming that there is a separate file per each 
bucket).

Piotrek

On 30 Jan 2020, at 14:21, Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:

Trying a few different approaches to the fs.s3a.fast.upload settings has bought 
me no joy - the taskmanagers end up simply crashing or complaining of high GC 
load. Heap dumps suggest that this time they're clogged with buffers instead, 
which makes sense.

Our job has parallelism of 6 and checkpoints every 15 minutes - if anything, 
we'd like to increase the frequency of that checkpoint duration. I suspect this 
could be affected by the partition structure we were bucketing to as well, and 
at any given moment we could be receiving data for up to 280 buckets at once.
Could this be a factor?

Best regards,

Mark

From: Piotr Nowojski mailto:pi...@ververica.com>>
Sent: 27 January 2020 16:16
To: Cliff Resnick mailto:cre...@gmail.com>>
Cc: David Magalhães mailto:speeddra...@gmail.com>>; Mark 
Harris mailto:mark.har...@hivehome.com>>; Till 
Rohrmann mailto:trohrm...@apache.org>>; 
flink-u...@apache.org 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

I think reducing the frequency of the checkpoints and decreasing parallelism of 
the things using the S3AOutputStream class, would help to mitigate the issue.

I don’t know about other solutions. I would suggest to ask this question 
directly to Steve L. in the bug ticket [1], as he is the one that fixed the 
issue. If there is no workaround, maybe it would be possible to put a pressure 
on the Hadoop guys to back port the fix to older versions?

Piotrek

[1] 

Re: Flink solution for having shared variable between task managers

2020-02-03 Thread Soheil Pourbafrani
Thanks, I'll check it out.

On Mon, Feb 3, 2020 at 10:05 AM Fabian Hueske  wrote:

> Hi,
>
> I think you are looking for BroadcastState [1].
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
> Am Fr., 17. Jan. 2020 um 14:50 Uhr schrieb Soheil Pourbafrani <
> soheil.i...@gmail.com>:
>
>> Hi,
>>
>> According to the processing logic, I need to have a HashMap variable that
>> should be shared between the taskmanagers. The scenario is the HashMap data
>> will be continuously updated according to the incoming stream of data.
>>
>> What I observed is declaring the HashMap variable as a class attribute,
>> it will be shared among a single taskmanagers slots, but in case I have
>> multiple taskmanager, each will have a separate HashMap instance.
>>
>> What is the standard way to achieve this? Does Flink provide any utility
>> for that?
>>
>


Question: Determining Total Recovery Time

2020-02-03 Thread Morgan Geldenhuys

Community,

I am interested in determining the total time to recover for a Flink 
application after experiencing a partial failure. Let's assume a 
pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once 
guarantees enabled.


Taking a look at the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), 
one of the metrics which can be gathered is /recoveryTime/. However, as 
far as I can tell this is only the time taken for the system to go from 
an inconsistent state back into a consistent state, i.e. restarting the 
job. Is there any way of measuring the amount of time taken from the 
point when the failure occurred till the point when the system catches 
up to the last message that was processed before the outage?


Thank you very much in advance!

Regards,
Morgan.


Re: Does flink support retries on checkpoint write failures

2020-02-03 Thread Till Rohrmann
Glad to hear that you could solve/mitigate the problem and thanks for
letting us know.

Cheers,
Till

On Sat, Feb 1, 2020 at 2:45 PM Richard Deurwaarder  wrote:

> Hi Till & others,
>
> We enabled setFailOnCheckpointingErrors
> (setTolerableCheckpointFailureNumber isn't available in 1.8) and this
> indeed prevents the large number of restarts.
>
> Hopefully a solution for the reported issue[1] with google gets found but
> for now this solved our immediate problem.
>
> Thanks again!
>
> [1] https://issuetracker.google.com/issues/137168102
>
> Regards,
>
> Richard
>
> On Thu, Jan 30, 2020 at 11:40 AM Arvid Heise  wrote:
>
>> If a checkpoint is not successful, it cannot be used for recovery.
>> That means Flink will restart to the last successful checkpoint and hence
>> not lose any data.
>>
>> On Wed, Jan 29, 2020 at 9:52 PM wvl  wrote:
>>
>>> Forgive my lack of knowledge here - I'm a bit out of my league here.
>>>
>>> But I was wondering if allowing e.g. 1 checkpoint to fail and the reason
>>> for which somehow caused a record to be lost (e.g. rocksdb exception /
>>> taskmanager crash / etc), there would be no Source rewind to the last
>>> successful checkpoint and this record would be lost forever, correct?
>>>
>>> On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder,  wrote:
>>>
 Hi Till,

 I'll see if we can ask google to comment on those issues, perhaps they
 have a fix in the works that would solve the root problem.
 In the meanwhile
 `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
 promising!
 Thank you for this. I'm going to try this tomorrow to see if that
 helps. I will let you know!

 Richard

 On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann 
 wrote:

> Hi Richard,
>
> googling a bit indicates that this might actually be a GCS problem [1,
> 2, 3]. The proposed solution/workaround so far is to retry the whole 
> upload
> operation as part of the application logic. Since I assume that you are
> writing to GCS via Hadoop's file system this should actually fall into the
> realm of the Hadoop file system implementation and not Flink.
>
> What you could do to mitigate the problem a bit is to set the number
> of tolerable checkpoint failures to a non-zero value via
> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to 
> `n`
> means that the job will only fail and then restart after `n` checkpoint
> failures. Unfortunately, we do not support a failure rate yet.
>
> [1] https://github.com/googleapis/google-cloud-java/issues/3586
> [2] https://github.com/googleapis/google-cloud-java/issues/5704
> [3] https://issuetracker.google.com/issues/137168102
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder 
> wrote:
>
>> Hi all,
>>
>> We've got a Flink job running on 1.8.0 which writes its state
>> (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a 
>> large
>> amount of state (500gb range) are becoming *very* unstable. In the order 
>> of
>> restarting once an hour or even more.
>>
>> The reason for this instability is that we run into "410 Gone"[4]
>> errors from Google Cloud Storage. This indicates an upload (write from
>> Flink's perspective) took place and it wanted to resume the write[2] but
>> could not find the file which it needed to resume. My guess is this is
>> because the previous attempt either failed or perhaps it uploads in 
>> chunks
>> of 67mb [3].
>>
>> The library logs this line when this happens:
>>
>> "Encountered status code 410 when accessing URL
>> https://www.googleapis.com/upload/storage/v1/b//o?ifGenerationMatch=0=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata=resumable_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>> Delegating to response handler for possible retry."
>>
>> We're kind of stuck on these questions:
>> * Is flink capable or doing these retries?
>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>> storage for bigger state sizes?
>> * Is it possible flink renames or deletes certain directories before
>> all flushes have been done based on an atomic guarantee provided by HDFS
>> that does not hold on other implementations perhaps? A race condition of
>> sorts
>>
>> Basically does anyone recognize this behavior?
>>
>> Regards,
>>
>> Richard Deurwaarder
>>
>> [1] We use an HDFS implementation provided by Google
>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>> [2]
>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>> [3]
>> 

Re: Flink solution for having shared variable between task managers

2020-02-03 Thread Fabian Hueske
Hi,

I think you are looking for BroadcastState [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Am Fr., 17. Jan. 2020 um 14:50 Uhr schrieb Soheil Pourbafrani <
soheil.i...@gmail.com>:

> Hi,
>
> According to the processing logic, I need to have a HashMap variable that
> should be shared between the taskmanagers. The scenario is the HashMap data
> will be continuously updated according to the incoming stream of data.
>
> What I observed is declaring the HashMap variable as a class attribute, it
> will be shared among a single taskmanagers slots, but in case I have
> multiple taskmanager, each will have a separate HashMap instance.
>
> What is the standard way to achieve this? Does Flink provide any utility
> for that?
>