Re: Jupyter PyFlink Web UI

2021-06-08 Thread Maciej Bryński
Nope.
I found the following solution.

conf = Configuration()
env = 
StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)

I also created the bug report https://issues.apache.org/jira/browse/FLINK-22924.
I think this API should be exposed in Python.

śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
>
> Hi Macike,
>
> You could try if the following works:
>
> ```
> table_env.get_config().get_configuration().set_string("rest.bind-port", "xxx")
> ```
>
> Regards,
> Dian
>
> > 2021年6月8日 下午8:26,maverick  写道:
> >
> > Hi,
> > I've got a question. I'm running PyFlink code from Jupyter Notebook starting
> > TableEnvironment with following code:
> >
> > env_settings =
> > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > table_env = TableEnvironment.create(env_settings)
> >
> > How can I enable Web UI in this code?
> >
> > Regards,
> > Maciek
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Maciek Bryński


Re: How to configure column width in Flink SQL client?

2021-06-08 Thread Ingo Bürk
Hi Svend,

I think it definitely makes sense to open a JIRA issue for it to discuss it
also with the people working on the SQL client. Thanks for taking care of
this!


Regards
Ingo

On Wed, Jun 9, 2021 at 7:25 AM Svend  wrote:

> Thanks for the feed-back Ingo,
>
> Do you think a PR would be welcome to make that parameter configurable? At
> the place where I work, UUID are often used as column values and they are
> 36 character longs => very often a very useful piece of information to us
> is not readable.
>
> I had a quick look, the max width seems to be defined in [1], and used in
> various places like [2] and [3]. Should I open a Jira to discuss this and
> cc you in it?
>
> Cheers,
>
> Svend
>
>
> [1]
> https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L74
> [2]
> https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L102
> [3]
> https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java#L143
>
>
> On Tue, 8 Jun 2021, at 7:34 AM, Ingo Bürk wrote:
>
> Hi Svend,
>
> unfortunately the column width in the SQL client cannot currently be
> configured.
>
>
> Regards
> Ingo
>
> On Mon, Jun 7, 2021 at 4:19 PM Svend  wrote:
>
>
>
> Hi everyone,
>
> When using the Flink SQL client and displaying results interactively, it
> seems the values of any column wider than 24 characters is truncated, which
> is indicated by a '~' character, e.g. the "member_user_id" below:
>
> ```
> SELECT
>   metadata.true_as_of_timestamp_millis,
>   member_user_id,
>   membership_updated.new_status.updated_value
> FROM fandom_members_events
> WHERE
>group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c'
>
>
> true_as_of_timestamp_mil~member_user_id
> updated_value
>  1622811665919 45ca821f-c0fc-4114-bef8-~
> (NULL)
>  1622811665919 45ca821f-c0fc-4114-bef8-~
> JOINED
>  1622118951005 b4734391-d3e1-417c-ad92-~
> (NULL)
> ...
> ```
>
> Is there a way to configure the displayed width? I didn't find any
> parameter for this in
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options
>
>
> Thanks a lot in advance!
>
> Svend
>
>
>


Re: How to configure column width in Flink SQL client?

2021-06-08 Thread Svend
Thanks for the feed-back Ingo,

Do you think a PR would be welcome to make that parameter configurable? At the 
place where I work, UUID are often used as column values and they are 36 
character longs => very often a very useful piece of information to us is not 
readable.

I had a quick look, the max width seems to be defined in [1], and used in 
various places like [2] and [3]. Should I open a Jira to discuss this and cc 
you in it?

Cheers,

Svend


[1] 
https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L74
[2] 
https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L102
[3] 
https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java#L143


On Tue, 8 Jun 2021, at 7:34 AM, Ingo Bürk wrote:
> Hi Svend,
> 
> unfortunately the column width in the SQL client cannot currently be 
> configured.
> 
> 
> Regards
> Ingo
> 
> On Mon, Jun 7, 2021 at 4:19 PM Svend  wrote:
>> __
>> 
>> Hi everyone,
>> 
>> When using the Flink SQL client and displaying results interactively, it 
>> seems the values of any column wider than 24 characters is truncated, which 
>> is indicated by a '~' character, e.g. the "member_user_id" below:
>> 
>> ```
>> SELECT
>>   metadata.true_as_of_timestamp_millis,
>>   member_user_id,
>>   membership_updated.new_status.updated_value
>> FROM fandom_members_events
>> WHERE
>>group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c'
>> 
>> 
>> true_as_of_timestamp_mil~member_user_id updated_value
>>  1622811665919 45ca821f-c0fc-4114-bef8-~
>> (NULL)
>>  1622811665919 45ca821f-c0fc-4114-bef8-~
>> JOINED
>>  1622118951005 b4734391-d3e1-417c-ad92-~
>> (NULL)
>> ...
>> ```
>> 
>> Is there a way to configure the displayed width? I didn't find any parameter 
>> for this in 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options
>> 
>> 
>> Thanks a lot in advance!
>> 
>> Svend


Re: Add control mode for flink

2021-06-08 Thread Xintong Song
>
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>

TBH, I don't really know yet. We feel that the control flow is a
non-trivial topic and it would be better to bring it up publicly as early
as possible, while the concrete plan is still on the way.

Personally, I'm leaning towards not touching the existing watermarks and
checkpoint barriers in the first step.
- I'd expect the control flow to be introduced as an experimental feature
that takes time to stabilize. It would be better that the existing
important features like checkpointing and watermarks stay unaffected.
- Checkpoint barriers are a little different, as other control messages
somehow rely on it to achieve exactly once consistency. Without the
concrete design, I'm not entirely sure whether it can be properly modeled
as a special case of general control messages.
- Watermarks are probably similar to the other control messages. However,
it's already exposed to users as public APIs. If we want to migrate it to
the new control flow, we'd be very careful not to break any compatibility.


Thank you~

Xintong Song



On Wed, Jun 9, 2021 at 11:30 AM Steven Wu  wrote:

> > producing control events from JobMaster is similar to triggering a
> savepoint.
>
> Paul, here is what I see the difference. Upon job or jobmanager recovery,
> we don't need to recover and replay the savepoint trigger signal.
>
> On Tue, Jun 8, 2021 at 8:20 PM Paul Lam  wrote:
>
>> +1 for this feature. Setting up a separate control stream is too much for
>> many use cases, it would very helpful if users can leverage the built-in
>> control flow of Flink.
>>
>> My 2 cents:
>> 1. @Steven IMHO, producing control events from JobMaster is similar to
>> triggering a savepoint. The REST api is non-blocking, and users should poll
>> the results to confirm the operation is succeeded. If something goes wrong,
>> it’s user’s responsibility to retry.
>> 2. There are two kinds of existing special elements, special stream
>> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> flow through the whole DAG, but events needs to be acknowledged by
>> downstream and can overtake records, while stream records are not). So I’m
>> wondering if we plan to unify the two approaches in the new control flow
>> (as Xintong mentioned both in the previous mails)?
>>
>> Best,
>> Paul Lam
>>
>> 2021年6月8日 14:08,Steven Wu  写道:
>>
>>
>> I can see the benefits of control flow. E.g., it might help the old (and
>> inactive) FLIP-17 side input. I would suggest that we add more details of
>> some of the potential use cases.
>>
>> Here is one mismatch with using control flow for dynamic config. Dynamic
>> config is typically targeted/loaded by one specific operator. Control flow
>> will propagate the dynamic config to all operators. not a problem per se
>>
>> Regarding using the REST api (to jobmanager) for accepting control
>> signals from external system, where are we going to persist/checkpoint the
>> signal? jobmanager can die before the control signal is propagated and
>> checkpointed. Did we lose the control signal in this case?
>>
>>
>> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song 
>> wrote:
>>
>>> +1 on separating the effort into two steps:
>>>
>>>1. Introduce a common control flow framework, with flexible
>>>interfaces for generating / reacting to control messages for various
>>>purposes.
>>>2. Features that leverating the control flow can be worked on
>>>concurrently
>>>
>>> Meantime, keeping collecting potential features that may leverage the
>>> control flow should be helpful. It provides good inputs for the control
>>> flow framework design, to make the framework common enough to cover the
>>> potential use cases.
>>>
>>> My suggestions on the next steps:
>>>
>>>1. Allow more time for opinions to be heard and potential use cases
>>>to be collected
>>>2. Draft a FLIP with the scope of common control flow framework
>>>3. We probably need a poc implementation to make sure the framework
>>>covers at least the following scenarios
>>>   1. Produce control events from arbitrary operators
>>>   2. Produce control events from JobMaster
>>>   3. Consume control events from arbitrary operators downstream
>>>   where the events are produced
>>>
>>>
>>> Thank you~
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:
>>>
 Very thanks Jiangang for bringing this up and very thanks for the
 discussion!

 I also agree with the summarization by Xintong and Jing that control
 flow seems to be
 a common buidling block for many functionalities 

Re: Add control mode for flink

2021-06-08 Thread Steven Wu
> producing control events from JobMaster is similar to triggering a
savepoint.

Paul, here is what I see the difference. Upon job or jobmanager recovery,
we don't need to recover and replay the savepoint trigger signal.

On Tue, Jun 8, 2021 at 8:20 PM Paul Lam  wrote:

> +1 for this feature. Setting up a separate control stream is too much for
> many use cases, it would very helpful if users can leverage the built-in
> control flow of Flink.
>
> My 2 cents:
> 1. @Steven IMHO, producing control events from JobMaster is similar to
> triggering a savepoint. The REST api is non-blocking, and users should poll
> the results to confirm the operation is succeeded. If something goes wrong,
> it’s user’s responsibility to retry.
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>
> Best,
> Paul Lam
>
> 2021年6月8日 14:08,Steven Wu  写道:
>
>
> I can see the benefits of control flow. E.g., it might help the old (and
> inactive) FLIP-17 side input. I would suggest that we add more details of
> some of the potential use cases.
>
> Here is one mismatch with using control flow for dynamic config. Dynamic
> config is typically targeted/loaded by one specific operator. Control flow
> will propagate the dynamic config to all operators. not a problem per se
>
> Regarding using the REST api (to jobmanager) for accepting control
> signals from external system, where are we going to persist/checkpoint the
> signal? jobmanager can die before the control signal is propagated and
> checkpointed. Did we lose the control signal in this case?
>
>
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song 
> wrote:
>
>> +1 on separating the effort into two steps:
>>
>>1. Introduce a common control flow framework, with flexible
>>interfaces for generating / reacting to control messages for various
>>purposes.
>>2. Features that leverating the control flow can be worked on
>>concurrently
>>
>> Meantime, keeping collecting potential features that may leverage the
>> control flow should be helpful. It provides good inputs for the control
>> flow framework design, to make the framework common enough to cover the
>> potential use cases.
>>
>> My suggestions on the next steps:
>>
>>1. Allow more time for opinions to be heard and potential use cases
>>to be collected
>>2. Draft a FLIP with the scope of common control flow framework
>>3. We probably need a poc implementation to make sure the framework
>>covers at least the following scenarios
>>   1. Produce control events from arbitrary operators
>>   2. Produce control events from JobMaster
>>   3. Consume control events from arbitrary operators downstream
>>   where the events are produced
>>
>>
>> Thank you~
>> Xintong Song
>>
>>
>>
>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:
>>
>>> Very thanks Jiangang for bringing this up and very thanks for the
>>> discussion!
>>>
>>> I also agree with the summarization by Xintong and Jing that control
>>> flow seems to be
>>> a common buidling block for many functionalities and dynamic
>>> configuration framework
>>> is a representative application that frequently required by users.
>>> Regarding the control flow,
>>> currently we are also considering the design of iteration for the
>>> flink-ml, and as Xintong has pointed
>>> out, it also required the control flow in cases like detection global
>>> termination inside the iteration
>>>  (in this case we need to broadcast an event through the iteration body
>>> to detect if there are still
>>> records reside in the iteration body). And regarding  whether to
>>> implement the dynamic configuration
>>> framework, I also agree with Xintong that the consistency guarantee
>>> would be a point to consider, we
>>> might consider if we need to ensure every operator could receive the
>>> dynamic configuration.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> --
>>> Sender:kai wang
>>> Date:2021/06/08 11:52:12
>>> Recipient:JING ZHANG
>>> Cc:刘建刚; Xintong Song [via Apache Flink User
>>> Mailing List archive.]; user<
>>> user@flink.apache.org>; dev
>>> Theme:Re: Add control mode for flink
>>>
>>>
>>>
>>> I'm big +1 for this feature.
>>>
>>>1. Limit the input qps.
>>>2. Change log level for debug.
>>>
>>> in my team, the two examples above are needed
>>>
>>> JING ZHANG  于2021年6月8日周二 上午11:18写道:
>>>
 Thanks Jiangang for bringing this up.
 As mentioned in Jiangang's email, `dynamic configuration framework`
 provides many useful functions in Kuaishou, because it could update job
 behavior without relaunching the job. The functions are v

Re: Add control mode for flink

2021-06-08 Thread Paul Lam
+1 for this feature. Setting up a separate control stream is too much for many 
use cases, it would very helpful if users can leverage the built-in control 
flow of Flink.

My 2 cents:
1. @Steven IMHO, producing control events from JobMaster is similar to 
triggering a savepoint. The REST api is non-blocking, and users should poll the 
results to confirm the operation is succeeded. If something goes wrong, it’s 
user’s responsibility to retry.
2. There are two kinds of existing special elements, special stream records 
(e.g. watermarks) and events (e.g. checkpoint barrier). They all flow through 
the whole DAG, but events needs to be acknowledged by downstream and can 
overtake records, while stream records are not). So I’m wondering if we plan to 
unify the two approaches in the new control flow (as Xintong mentioned both in 
the previous mails)?

Best,
Paul Lam

> 2021年6月8日 14:08,Steven Wu  写道:
> 
> 
> I can see the benefits of control flow. E.g., it might help the old (and 
> inactive) FLIP-17 side input. I would suggest that we add more details of 
> some of the potential use cases.
> 
> Here is one mismatch with using control flow for dynamic config. Dynamic 
> config is typically targeted/loaded by one specific operator. Control flow 
> will propagate the dynamic config to all operators. not a problem per se 
> 
> Regarding using the REST api (to jobmanager) for accepting control signals 
> from external system, where are we going to persist/checkpoint the signal? 
> jobmanager can die before the control signal is propagated and checkpointed. 
> Did we lose the control signal in this case?
> 
> 
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song  > wrote:
> +1 on separating the effort into two steps:
> Introduce a common control flow framework, with flexible interfaces for 
> generating / reacting to control messages for various purposes.
> Features that leverating the control flow can be worked on concurrently
> Meantime, keeping collecting potential features that may leverage the control 
> flow should be helpful. It provides good inputs for the control flow 
> framework design, to make the framework common enough to cover the potential 
> use cases.
> 
> My suggestions on the next steps:
> Allow more time for opinions to be heard and potential use cases to be 
> collected
> Draft a FLIP with the scope of common control flow framework
> We probably need a poc implementation to make sure the framework covers at 
> least the following scenarios
> Produce control events from arbitrary operators
> Produce control events from JobMaster
> Consume control events from arbitrary operators downstream where the events 
> are produced
> 
> Thank you~
> Xintong Song
> 
> 
> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  > wrote:
> Very thanks Jiangang for bringing this up and very thanks for the discussion! 
> 
> I also agree with the summarization by Xintong and Jing that control flow 
> seems to be
> a common buidling block for many functionalities and dynamic configuration 
> framework
> is a representative application that frequently required by users. Regarding 
> the control flow, 
> currently we are also considering the design of iteration for the flink-ml, 
> and as Xintong has pointed
> out, it also required the control flow in cases like detection global 
> termination inside the iteration
>  (in this case we need to broadcast an event through the iteration body to 
> detect if there are still 
> records reside in the iteration body). And regarding  whether to implement 
> the dynamic configuration 
> framework, I also agree with Xintong that the consistency guarantee would be 
> a point to consider, we 
> might consider if we need to ensure every operator could receive the dynamic 
> configuration. 
> 
> Best,
> Yun
> 
> 
> 
> --
> Sender:kai wangmailto:yiduwang...@gmail.com>>
> Date:2021/06/08 11:52:12
> Recipient:JING ZHANGmailto:beyond1...@gmail.com>>
> Cc:刘建刚mailto:liujiangangp...@gmail.com>>; Xintong 
> Song [via Apache Flink User Mailing List 
> archive.] >; user >; dev >
> Theme:Re: Add control mode for flink
> 
> 
> 
> I'm big +1 for this feature. 
> Limit the input qps.
> Change log level for debug.
> in my team, the two examples above are needed
> 
> JING ZHANG mailto:beyond1...@gmail.com>> 于2021年6月8日周二 
> 上午11:18写道:
> Thanks Jiangang for bringing this up. 
> As mentioned in Jiangang's email, `dynamic configuration framework` provides 
> many useful functions in Kuaishou, because it could update job behavior 
> without relaunching the job. The functions are very popular in Kuaishou, we 
> also see similar demands in maillist [1].
> 
> I'm big +1 for this feature.
> 
> Thanks Xintong and Yun for deep thoughts about the issue. I like the idea 
> about introducing control mode in 

Re: Re: Add control mode for flink

2021-06-08 Thread Steven Wu
option 2 is probably not feasible, as checkpoint may take a long time or
may fail.

Option 1 might work, although it complicates the job recovery and
checkpoint. After checkpoint completion, we need to clean up those control
signals stored in HA service.

On Tue, Jun 8, 2021 at 1:14 AM 刘建刚  wrote:

> Thanks for the reply. It is a good question. There are multi choices as
> follows:
>
>1. We can persist control signals in HighAvailabilityServices and replay
>them after failover.
>2. Only tell the users that the control signals take effect after they
>are checkpointed.
>
>
> Steven Wu [via Apache Flink User Mailing List archive.] <
> ml+s2336050n44278...@n4.nabble.com> 于2021年6月8日周二 下午2:15写道:
>
> >
> > I can see the benefits of control flow. E.g., it might help the old (and
> > inactive) FLIP-17 side input. I would suggest that we add more details of
> > some of the potential use cases.
> >
> > Here is one mismatch with using control flow for dynamic config. Dynamic
> > config is typically targeted/loaded by one specific operator. Control
> flow
> > will propagate the dynamic config to all operators. not a problem per se
> >
> > Regarding using the REST api (to jobmanager) for accepting control
> > signals from external system, where are we going to persist/checkpoint
> the
> > signal? jobmanager can die before the control signal is propagated and
> > checkpointed. Did we lose the control signal in this case?
> >
> >
> > On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]
> > > wrote:
> >
> >> +1 on separating the effort into two steps:
> >>
> >>1. Introduce a common control flow framework, with flexible
> >>interfaces for generating / reacting to control messages for various
> >>purposes.
> >>2. Features that leverating the control flow can be worked on
> >>concurrently
> >>
> >> Meantime, keeping collecting potential features that may leverage the
> >> control flow should be helpful. It provides good inputs for the control
> >> flow framework design, to make the framework common enough to cover the
> >> potential use cases.
> >>
> >> My suggestions on the next steps:
> >>
> >>1. Allow more time for opinions to be heard and potential use cases
> >>to be collected
> >>2. Draft a FLIP with the scope of common control flow framework
> >>3. We probably need a poc implementation to make sure the framework
> >>covers at least the following scenarios
> >>   1. Produce control events from arbitrary operators
> >>   2. Produce control events from JobMaster
> >>   3. Consume control events from arbitrary operators downstream
> >>   where the events are produced
> >>
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]
> >> > wrote:
> >>
> >>> Very thanks Jiangang for bringing this up and very thanks for the
> >>> discussion!
> >>>
> >>> I also agree with the summarization by Xintong and Jing that control
> >>> flow seems to be
> >>> a common buidling block for many functionalities and dynamic
> >>> configuration framework
> >>> is a representative application that frequently required by users.
> >>> Regarding the control flow,
> >>> currently we are also considering the design of iteration for the
> >>> flink-ml, and as Xintong has pointed
> >>> out, it also required the control flow in cases like detection global
> >>> termination inside the iteration
> >>>  (in this case we need to broadcast an event through the iteration body
> >>> to detect if there are still
> >>> records reside in the iteration body). And regarding  whether to
> >>> implement the dynamic configuration
> >>> framework, I also agree with Xintong that the consistency guarantee
> >>> would be a point to consider, we
> >>> might consider if we need to ensure every operator could receive the
> >>> dynamic configuration.
> >>>
> >>> Best,
> >>> Yun
> >>>
> >>>
> >>>
> >>> --
> >>> Sender:kai wang<[hidden email]
> >>> >
> >>> Date:2021/06/08 11:52:12
> >>> Recipient:JING ZHANG<[hidden email]
> >>> >
> >>> Cc:刘建刚<[hidden email]
> >>> >; Xintong Song
> >>> [via Apache Flink User Mailing List archive.]<[hidden email]
> >>> >; user<[hidden
> >>> email] >;
> dev<[hidden
> >>> email] >
> >>> Theme:Re: Add control mode for flink
> >>>
> >>>
> >>>
> >>> I'm big +1 for this feature.
> >>>
> >>>1. Limit the input qps.
> >>>2. Change log level for debug.
> >>>
> >>> in my team, the two examples above are needed
> >>>
> >>> JING ZHANG

[table-walkthrough exception] Unable to create a source for reading table...

2021-06-08 Thread Lingfeng Pu
Hi,

I'm following the tutorial to run the "flink-playground/table-walkthrough"
project on IDEA. However, I got *the exception as follows:*

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Unable to create a source for reading table
'default_catalog.default_database.transactions'.

*The key localhost environment info shows below:*
1. OS: Fedora 34; 2. Flink version: 1.13.1;
3. Java version: 1.8; 4. Maven version: 3.6.3;
5. Docker version: 20.10.7 (API version: 1.41).

*The entire error report shows below:*
/usr/java/jdk1.8.0_291-amd64/bin/java
-javaagent:/var/lib/snapd/snap/intellij-idea-community/302/lib/idea_rt.jar=46805:/var/lib/snapd/snap/intellij-idea-community/302/bin
-Dfile.encoding=UTF-8 -classpath
/usr/java/jdk1.8.0_291-amd64/jre/lib/charsets.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/deploy.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/cldrdata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/dnsns.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jaccess.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jfxrt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/localedata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/nashorn.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunec.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunjce_provider.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunpkcs11.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/zipfs.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/javaws.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jce.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfr.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfxswt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jsse.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/management-agent.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/plugin.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/resources.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/rt.jar:/home/AkatsukiG5/IdeaProjects/flink-playgrounds-master/table-walkthrough/target/classes:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java/1.12.1/flink-table-api-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-common/1.12.1/flink-table-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-files/1.12.1/flink-connector-files-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-base/1.12.1/flink-connector-base-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-asm-7/7.1-12.0/flink-shaded-asm-7-7.1-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/slf4j/slf4j-api/1.7.15/slf4j-api-1.7.15.jar:/home/AkatsukiG5/Documents/mavenRepo/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/force-shading/1.12.1/force-shading-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java-bridge_2.11/1.12.1/flink-table-api-java-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-java/1.12.1/flink-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-java_2.11/1.12.1/flink-streaming-java_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-file-sink-common/1.12.1/flink-file-sink-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-guava/18.0-12.0/flink-shaded-guava-18.0-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-scala_2.11/1.12.1/flink-streaming-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-scala_2.11/1.12.1/flink-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-reflect/2.11.12/scala-reflect-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-compiler/2.11.12/scala-compiler-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-xml_2.11/1.0.5/scala-xml_2.11-1.0.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-planner-blink_2.11/1.12.1/flink-table-planner-blink_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala_2.11/1.12.1/flink-table-api-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala-bridge_2.11/1.12.1/flink-table-api-scala-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-runtime-blink_2.11/1.12.1/flink-table-runtime-blink_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/codehaus/janino/janino/3.0.11/janino-3.0.11.jar:/home/AkatsukiG5/Documents/mavenRepo/org/codehaus/janino/commons-compil

Re: Jupyter PyFlink Web UI

2021-06-08 Thread Dian Fu
Hi Macike,

You could try if the following works:

```
table_env.get_config().get_configuration().set_string("rest.bind-port", "xxx")
```

Regards,
Dian

> 2021年6月8日 下午8:26,maverick  写道:
> 
> Hi,
> I've got a question. I'm running PyFlink code from Jupyter Notebook starting
> TableEnvironment with following code:
> 
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = TableEnvironment.create(env_settings)
> 
> How can I enable Web UI in this code?
> 
> Regards,
> Maciek
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: ByteSerializationSchema in PyFlink

2021-06-08 Thread Dian Fu
Hi Wouter,

Great to hear and thanks for the sharing!

Regards,
Dian

> 2021年6月8日 下午4:44,Wouter Zorgdrager  写道:
> 
> Hi Dian, all,
> 
> The way I resolved right now, is to write my own custom serializer which only 
> maps from bytes to bytes. See the code below:
> public class KafkaBytesSerializer implements SerializationSchema, 
> DeserializationSchema {
> 
> @Override
> public byte[] deserialize(byte[] bytes) throws IOException {
> return bytes;
> }
> 
> @Override
> public boolean isEndOfStream(byte[] bytes) {
> return false;
> }
> 
> @Override
> public byte[] serialize(byte[] bytes) {
> return bytes;
> }
> 
> @Override
> public TypeInformation getProducedType() {
> return TypeInformation.of(byte[].class);
> }
> }
> 
> This code is packaged in a jar and uploaded through env.add_jars. That works 
> like a charm! 
> 
> Thanks for the help!
> Wouter
> 
> On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager  > wrote:
> Hi Dian, all,
> 
> Thanks for your suggestion. Unfortunately, it does not seem to work. I get 
> the following exception:
> 
> Caused by: java.lang.NegativeArraySizeException: -2147183315
> at 
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
> at 
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
> at 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
> at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> 
> To be more precise, the messages in my Kafka topic are pickled Python 
> objects. Maybe that is the reason for the exception, I also tried using 
> Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same 
> serializer because I get the same exception.
> 
> Any suggestions? Thanks for your help!
> 
> Regards,
> Wouter
> 
> On Fri, 4 Jun 2021 at 08:24, Dian Fu  > wrote:
> Hi Wouter,
> 
>> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
>> Constructor 
>> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
>> org.apache.flink.configuration.Configuration]) does not exist
> 
> As the exception indicate, the constructor doesn’t exists.
> 
> 
> 
> Could you try with the following:
> 
> ```
> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
> j_type_serializer= 
> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>  j_type_serializer)
> ```
> 
> Regards,
> Dian
> 
>> 2021年6月3日 下午8:51,Wouter Zorgdrager > > 写道:
>> 
>> Hi all,
>> 
>> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to 
>> directly work with the bytes from and to Kafka because I want to 
>> serialize/deserialize in my Python code rather than the JVM environment. 
>> Therefore, I can't use the SimpleStringSchema for (de)serialization (the 
>> messages aren't strings anyways). I've tried to create a 
>> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>> 
>> class ByteSerializer(SerializationSchema, DeserializationSchema):
>> def __init__(self, execution_environment):
>> gate_way = get_gateway()
>> 
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
>> Types.BYTE().get_java_type_info(),
>> get_j_env_configuration(execution_environment),
>> )
>> SerializationSchema.__init__(self, 
>> j_serialization_schema=j_byte_string_schema)
>> DeserializationSchema.__init__(
>> self, j_deserialization_schema=j_byte_string_schema
>> )
>> The ByteSerializer is used li

Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-08 Thread Joseph Lorenzini




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way
 to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each kafka topic, I configured the
 flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events are emitted as expected.
 If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Persisting state in RocksDB

2021-06-08 Thread Paul K Moore
Hi all,

First post here, so please be kind :)

Firstly some context; I have the following high-level job topology:

(1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction

1. The FlinkPulsarSource reads event notifications about article updates from a 
Pulsar topic
2. The RichAsyncFunction fetches the “full” article from the specified URL 
end-point, and transmutes it into a “legacy” article format
3. The SinkFunction writes the “legacy” article to a (legacy) web platform i.e. 
the sink is effectively another web site

I have this all up and running (despite lots of shading fun).

When the SinkFunction creates an article on the legacy platform it returns an 
'HTTP 201 - Created’ with a Location header suitably populated.

Now, I need to persist that Location header and, more explicitly, need to 
persist a map between the URLs for the new and legacy platforms.  This is 
needed for latter update and delete processing.

The question is how do I store this mapping information?

I’ve spent some time trying to grok state management and the various backends, 
but from what I can see the state management is focused on “operator scoped” 
state.  This seems reasonable given the requirement for barriers etc to ensure 
accurate recovery.

However, I need some persistence between operators (shared state?) and with 
longevity beyond the processing of an operator.

My gut reaction is that I need an external K-V store such as Ignite (or 
similar). Frankly given that Flink ships with embedded RocksDB I was hoping to 
use that, but there seems no obvious way to do this, and lots of advice saying 
don’t :)

Am I missing something obvious here?

Many thanks in advance

Paul




Re: Questions about implementing a flink source

2021-06-08 Thread Evan Palmer
Hello again,

Thank you for all of your help so far, I have a few more questions if you
have the time :)

1. Deserialization Schema

There's been some debate within my team about whether we should offer a
DeserializationSchema and SerializationSchema in our source and sink.

If we include don't include the schemas, our source and sink would be
implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>,
which is the type our client library currently returns (this type is
serializable), and users could transform the messages in a map function
after the source. This would make implementing the source somewhat easier,
and it doesn't seem like it would be much more difficult for users. On the
other hand, I looked around and didn't find any flink sources implemented
without a deserialization/serialization schema, so I'm worried that this
choice might make our source/sink confusing for users, or that we're
missing something. What are your thoughts on this?

2. Order aware rebalancing.

I want to make sure I understand the problem with rebalancing partitions to
different SourceReaders. Does any reassignment of a pub/sub
partition between SourceReaders have the potential to cause disorder, or
can order be guaranteed by some variant of ensuring that the partition is
assigned to only one source reader at a time?

I read through
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows,
which made me think that if the user wanted a pipeline like

env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)

Then if two different source tasks had messages from a single pub/sub
partition, there could be disorder. We're not planning to implement any
rebalancing of partitions in our source, but I wanted to make sure I can
document this correctly :)

3. Reporting permanent failures in the Sink

Is it sufficient to throw an exception from Committer.commit() in the case
where our sink has permanently failed in some way (e.g. the configured
topic has been deleted, or the user doesn't have permissions to publish),
or is there something else we should be doing?

Evan


On Mon, May 10, 2021 at 9:57 AM Arvid Heise  wrote:

> Hi Evan,
>
> A few replies / questions inline. Somewhat relatedly, I'm also wondering
>> where this connector should live. I saw that there's already a pubsub
>> connector in
>> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
>> so if flink is willing to host it, perhaps it could live near there?
>> Alternatively, it could live alongside our client library in
>> https://github.com/googleapis/java-pubsublite.
>>
>
> For a long time, the community has been thinking of moving (most)
> connectors out of the repository. Especially now with the new source/sink
> interface, the need to decouple Flink release cycle and connector release
> cycle is bigger than ever as we do not backport features in our bugfix
> branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many
> users would need to wait up to a year to effectively use the source
> (adaption of new Flink versions is usually slow).
> Therefore, I'd definitely encourage you to have the connector along your
> client library - where the release cycles probably also much better align.
> I will soon present an idea on how to list all available connectors on
> Flink's connector page such that from a user's perspective, it wouldn't
> matter if it's internal and external. If it turns out that the community
> rather wants to have all connectors still in the main repo, we can look at
> contributing it at a later point in time.
>

Okay, thanks for the context! We will host the connector in our repository.

>
> I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
>> It seems like these base implementations are mostly designed to help in
>> cases where the client library uses a synchronous pull based approach. Our
>> client library is async - we use a bidirectional stream to pull
>> messages from our brokers and we have some flow control settings to limit
>> the number of bytes and messages outstanding to the client. I'm wondering
>> if because of this, we should just implement the SourceReader interface. In
>> particular, we have a per partition subscriber class which buffers messages
>> up to the flow control limit and exposes an API almost identical to
>> SourceReader's pollNext and IsAvailable. What do you think?
>>
>
> Good catch. Yes, the implementation is more or less simulating the async
> fetching that your library apparently offers already. So feel free to skip
> it. Of course, if it turns out that you still need certain building blocks,
> such as record handover, we can also discuss pulling up a common base class
> to the async sources and the
>
SingleThreadMultiplexSourceReaderBase.
>
>> Ah, okay, this helped a lot. I'd m

📝2 weeks left to submit your talks for Flink Forward Global 2021!

2021-06-08 Thread Caito Scherr
Hi there,

The Call for Presentations [1] for Flink Forward Global 2021 closes in just
2 weeks on Monday, June 21! Are you working on an inspiring Flink story,
real-world application, or use case? Now is a good time to finalize and
submit your talk ideas to get the chance to present them to the Flink
community on October 26-27.

This year’s conference tracks are:

   -

   Use Case
   -

   Technology Deep Dive
   -

   Ecosystem
   -

   Stateful Functions (NEW!)


>From getting to share your experiences with fellow Flink enthusiasts and
expanding your network to receiving special swag - the perks of being a
Flink Forward speaker make it a worthwhile experience! Your talk will also
be recorded and promoted on YouTube [2] and Twitter [3] and you will get a
chance to interact with your audience in real-time.

Be sure to submit your exciting ideas by June 21, 11:59 pm PDT!

See you online!

Caito Scherr

Program Committee Chair for Flink Forward Global 2021

[1] https://www.flink-forward.org/global-2021/call-for-presentations

[2] https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA

[3] https://twitter.com/FlinkForward


-- 

*Caito Scherr** | *Developer Advocate, USA

*ca...@ververica.com  *



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference
Stream Processing | Event Driven | Real Time


Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-08 Thread Kezhu Wang
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t
stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?


[1]: https://issues.apache.org/jira/browse/FLINK-21028


Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed
an issue for the exception first [1].



[1] https://issues.apache.org/jira/browse/FLINK-22928


--Original Mail --
*Sender:*Thomas Wang 
*Send Date:*Tue Jun 8 07:45:52 2021
*Recipients:*Yun Gao 
*CC:*user 
*Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API

> This is actually a very simple job that reads from Kafka and writes to S3
> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
> and nothing custom.
>
> Thomas
>
> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao  wrote:
>
>> Hi Thoms,
>>
>> Very thanks for reporting the exceptions, and it seems to be not work as
>> expected to me...
>> Could you also show us the dag of the job ? And does some operators in
>> the source task
>> use multiple-threads to emit records?
>>
>> Best,
>> Yun
>>
>>
>> --Original Mail --
>> *Sender:*Thomas Wang 
>> *Send Date:*Sun Jun 6 04:02:20 2021
>> *Recipients:*Yun Gao 
>> *CC:*user 
>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>>
>>> One thing I noticed is that if I set drain = true, the job could be
>>> stopped correctly. Maybe that's because I'm using a Parquet file sink which
>>> is a bulk-encoded format and only writes to disk during checkpoints?
>>>
>>> Thomas
>>>
>>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:
>>>
 Hi Yun,

 Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
 not quite sure what they mean though. Any hints?

 Thanks.

 Thomas

 ```
 2021-06-05 10:02:51
 java.util.concurrent.ExecutionException:
 org.apache.flink.streaming.runtime.tasks.
 ExceptionInChainedOperatorException: Could not forward element to next
 operator
 at java.util.concurrent.CompletableFuture.reportGet(
 CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture
 .java:1928)
 at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
 .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
 at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
 .close(StreamOperatorWrapper.java:130)
 at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
 .close(StreamOperatorWrapper.java:134)
 at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
 .close(StreamOperatorWrapper.java:80)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain
 .closeOperators(OperatorChain.java:302)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
 StreamTask.java:576)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
 StreamTask.java:544)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.flink.streaming.runtime.tasks.
 ExceptionInChainedOperatorException: Could not forward element to next
 operator
 at org.apache.flink.streaming.runtime.tasks.
 OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
 at org.apache.flink.streaming.api.operators.CountingOutput
 .emitWatermark(CountingOutput.java:41)
 at org.apache.flink.streaming.runtime.operators.
 TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
 TimestampsAndWatermarksOperator.java:165)
 at org.apache.flink.api.common.eventtime.
 BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
 BoundedOutOfOrdernessWatermarks.java:69)
 at org.apache.flink.streaming.runtime.operators.
 TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
 .java:125)
 at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
 .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
 at org.apache.flink.streaming.runtime.tasks.
 StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
 .runThrowing(StreamTaskActionExecutor.java:92)
 at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
 .closeOperator(StreamOperatorWrapper.java:203)
 at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
 .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
 at org.apache.flink.streaming.runtime.tasks.
 StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
 .runThrowing(StreamTaskActionExecutor.java:92)
 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail

Re: Using s3 bucket for high availability

2021-06-08 Thread Kurtis Walker
Sorry, fat finger send before I finished writing….

Hello,
  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:

kubernetes.service-account: flink-service-account
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: 
s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery

I’m getting an error accessing the bucket.

2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client  
   [] - Bucket region cache doesn't have an entry for 
corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket 
region from Amazon S3.
2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson   
   [] - Failed to parse JSON string.
com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map 
due to end-of-input
at [Source: (String)""; line: 1, column: 0]
at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]

Is there an additional config I need for specifying the region for the bucket?  
I’ve been searching the doc and haven’t found anything like that.


From: Kurtis Walker 
Date: Tuesday, June 8, 2021 at 10:55 AM
To: user 
Subject: Using s3 bucket for high availability
Hello,
  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:


Re: Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-08 Thread Yun Gao
Hi Chirag,

As far as I know, If you are running a single job, I think all th pods share 
the same 
state.checkpoints.dir configuration should be as expected, and it is not 
necessary 
to configuraiton the rocksdb local dir since Flink will chosen a default dir.

Regarding the latest exception, I think you might first check the key type used 
and 
the key type should has a stable hashcode method. 

Best,
Yun




 --Original Mail --
Sender:Chirag Dewan 
Send Date:Tue Jun 8 18:06:07 2021
Recipients:User , Yun Gao 
Subject:Re: Multiple Exceptions during Load Test in State Access APIs with 
RocksDB

Hi,

Although this looks like a problem to me, I still cant conclude it. 

I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:


Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.

Any more leads?

Thanks,
Chirag


 On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote: 


Hi,

I think I got my issue. Would help if someone can confirm it :)

I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 

All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.

Does this lead to state corruption?

Thanks,
Chirag



 On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote: 


Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.

Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?

 On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote: 


Hi Chirag,

If be able to produce the exception, could you first add some logs to print
the value of valueState, valueState.value(), inEvent and 
inEvent.getPriceDelta() ?
I think either object being null would cause NullPointerException here. 

For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-18587

Best,
Yun



 --Original Mail --
Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021
Recipients:User 
Subject:Multiple Exceptions during Load Test in State Access APIs with RocksDB

Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 
I have 2 Task Managers with 2 taskslots and 4 cores each. 
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction {
private ValueStatevalueState;
@Override
public void open() throws Exception {
ValueStateDescriptor descriptor =
   new ValueStateDescriptor(
   "totalPrize",Integer.class);
valueState =getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)
   throws Exception {
if(valueState.value() == null) {
   valueState.update(0);
}
valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line
int sum =valueState.value();
GameZoneOutputoutput = new GameZoneOutput();
   output.setPlayerId(inEvent.getPlayerId());
   output.setNetPrize(sum);
   outEvents.add(output);
}
@Override
public void close() throws Exception {
   valueState.clear();
}
}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.

Another strange thing is that this is observed only in load conditions and 
works fine otherwise.

We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.
atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)
atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)
atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRo

Using s3 bucket for high availability

2021-06-08 Thread Kurtis Walker
Hello,
  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:


Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-08 Thread Yun Gao
Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].



[1] https://issues.apache.org/jira/browse/FLINK-22928



 --Original Mail --
Sender:Thomas Wang 
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API

This is actually a very simple job that reads from Kafka and writes to S3 using 
the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing 
custom.

Thomas
On Sun, Jun 6, 2021 at 6:43 PM Yun Gao  wrote:

Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as 
expected to me... 
Could you also show us the dag of the job ? And does some operators in the 
source task
use multiple-threads to emit records?

Best,
Yun



 --Original Mail --
Sender:Thomas Wang 
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Failed to cancel a job using the STOP rest API

One thing I noticed is that if I set drain = true, the job could be stopped 
correctly. Maybe that's because I'm using a Parquet file sink which is a 
bulk-encoded format and only writes to disk during checkpoints?

Thomas
On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:

Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not 
quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
at 
org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
... 9 more
Caused by: java.lang.RuntimeException
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.A

Re: State migration for sql job

2021-06-08 Thread aitozi
Thanks for JING & Kurt's reply. I think we prefer to choose the option (a)
that will not take  the history data into account. 

IMO, if we want to process all the historical data, we have to store the
original data, which may be a big overhead to backend. But if we just
aggregate after the new added function, may just need a data format
transfer. Besides, the business logic we met only need the new aggFunction
accumulate with new data.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Query related to Minimum scrape interval for Prometheus and fetching metrics of all vertices in a job through Flink Rest API

2021-06-08 Thread Chesnay Schepler
There is no recommended scrape interval because it is largely dependent 
on your requirements.
For example, if you're fine with reacting to problems within an hour, 
then a 5s scrape interval doesn't make sense.


The lower the interval the more resources must of course be spent on 
serving the prometheus request; you will need to experiment whether this 
incurs an unacceptable performance impact.


On 6/8/2021 3:07 PM, Ashutosh Uttam wrote:

Thanks Matthias.

We are using Prometheus for fetching metrics. Is there any recommended 
scrape interval ?

Also is there any impact if lower scrape intervals are used?

Regards,
Ashutosh

On Fri, May 28, 2021 at 7:17 PM Matthias Pohl > wrote:


Hi Ashutosh,
you can set the metrics update interval
through metrics.fetcher.update-interval [1]. Unfortunately, there
is no single endpoint to collect all the metrics in a more
efficient way other than the metrics endpoints provided in [2].

I hope that helps.
Best,
Matthias

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#metrics-fetcher-update-interval


[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/


On Wed, May 26, 2021 at 2:01 PM Ashutosh Uttam
mailto:ashutoshut...@gmail.com>> wrote:

Hi team,

I have two queries as mentioned below:

*_Query1:_*
I am using PrometheusReporter to expose metrics to Prometheus
Server.
What should be the minimum recommended scrape interval to be
defined on Prometheus server?
Is there any interval in which Flink reports metrics?

*_Query2:_*
Is there any way I can fetch the metrics of all vertices
(including subtasks) of a job through a single Monitoring Rest
API of Flink.

As of now what I have tried is first finding the vertices and
then querying individual vertex for metrics as below:

*Step 1:* Finding jobId (http://:/jobs)
*Step 2:* Finding vertices Id (http://:/jobs/)
*Step 3:* Finding aggregated metrics (including parallelism)
of a vertex

(http://:/jobs//vertices//subtasks/metrics?get=,)


So like wise I have to invoke multiple rest apis for each
vertex id . Is there any optimised way to get metrics of all
vertices?


Thanks & Regards,
Ashutosh





Re: Allow setting job name when using StatementSet

2021-06-08 Thread Yuval Itzchakov
Yup, that worked. Thank you guys for pointing it out!

On Tue, Jun 8, 2021, 09:33 JING ZHANG  wrote:

> I agree with Nico, I just add the link of pipeline.name
> 
>  here.
>
> Nicolaus Weidner  于2021年6月7日周一
> 下午11:46写道:
>
>> Hi Yuval,
>>
>> I am not familiar with the Table API, but in the fragment you posted, the
>> generated job name is only used as default if configuration option
>> pipeline.name is not set. Can't you just set that to the name you want
>> to have?
>>
>> Best wishes,
>> Nico
>>
>> On Mon, Jun 7, 2021 at 10:09 AM Yuval Itzchakov 
>> wrote:
>>
>>> Hi,
>>>
>>> Currently when using StatementSet, the name of the job is autogenerated
>>> by the runtime:
>>>
>>> [image: image.png]
>>>
>>> Is there any reason why there shouldn't be an overload that allows the
>>> user to explicitly specify the name of the running job?
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>


Re: Query related to Minimum scrape interval for Prometheus and fetching metrics of all vertices in a job through Flink Rest API

2021-06-08 Thread Ashutosh Uttam
Thanks Matthias.

We are using Prometheus for fetching metrics. Is there any recommended
scrape interval ?
Also is there any impact if lower scrape intervals are used?

Regards,
Ashutosh

On Fri, May 28, 2021 at 7:17 PM Matthias Pohl 
wrote:

> Hi Ashutosh,
> you can set the metrics update interval
> through metrics.fetcher.update-interval [1]. Unfortunately, there is no
> single endpoint to collect all the metrics in a more efficient way other
> than the metrics endpoints provided in [2].
>
> I hope that helps.
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#metrics-fetcher-update-interval
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/
>
> On Wed, May 26, 2021 at 2:01 PM Ashutosh Uttam 
> wrote:
>
>> Hi team,
>>
>> I have two queries as mentioned below:
>>
>> *Query1:*
>> I am using PrometheusReporter to expose metrics to Prometheus Server.
>> What should be the minimum recommended scrape interval to be defined on
>> Prometheus server?
>> Is there any interval in which Flink reports metrics?
>>
>> *Query2:*
>> Is there any way I can fetch the metrics of all vertices (including
>> subtasks) of a job through a single Monitoring Rest API of Flink.
>>
>> As of now what I have tried is first finding the vertices and then
>> querying individual vertex for metrics as below:
>>
>> *Step 1:* Finding jobId (http://:/jobs)
>> *Step 2:* Finding vertices Id (http://:/jobs/)
>> *Step 3:* Finding aggregated metrics (including parallelism) of a
>> vertex  
>> (http://:/jobs//vertices//subtasks/metrics?get=,)
>>
>>
>> So like wise I have to invoke multiple rest apis for each vertex id . Is
>> there any optimised way to get metrics of all vertices?
>>
>>
>> Thanks & Regards,
>> Ashutosh
>>
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Great :-)

On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao  wrote:

> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li 
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details.  FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >>current hash-based blocking shuffle implementation writes too many
>>> files
>>> >>concurrently which gives high pressure to the file system, for
>>> example,
>>> >>maintenance of too many file metas, exhaustion of inodes or file
>>> >>descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >>based blocking shuffle don’t have the problem because for one
>>> result
>>> >>partition, only one file is written at the same time.
>>> >>2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >>can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >>sequential read is also important because of read ahead and
>>> cache). For
>>> >>batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >>is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >>of small subpartition files. By merging data of all subpartitions
>>> together
>>> >>in one file, more sequential read can be achieved.
>>> >>3. *Resource*: For current hash-based implementation, each
>>> >>subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >>memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >>memory per result partition if parallelism is set to 1 and
>>> because of
>>> >>the huge network consumption, it is hard to config the network
>>> memory for
>>> >>large scale batch job and  sometimes parallelism can not be
>>> increased just
>>> >>because of insufficient network memory  which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>


Jupyter PyFlink Web UI

2021-06-08 Thread maverick
Hi,
I've got a question. I'm running PyFlink code from Jupyter Notebook starting
TableEnvironment with following code:

env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)

How can I enable Web UI in this code?

Regards,
Maciek



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to unsubscribe?

2021-06-08 Thread Leonard Xu
Hi, Morgan

Just send an email with any content to user-unsubscr...@flink.apache.org 
 will unsubscribe the mail from Flink 
 user mail list.
And also send an email to with any content to dev-unsubscr...@flink.apache.org 
 will unsubscribe the mail from Flink 
dev mail list.

Please make sure you’ve sent the email to correct address.

Best,
Leonard

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Yingjie Cao
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann  于2021年6月8日周二 下午5:30写道:

> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details.  FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >>current hash-based blocking shuffle implementation writes too many
>> files
>> >>concurrently which gives high pressure to the file system, for
>> example,
>> >>maintenance of too many file metas, exhaustion of inodes or file
>> >>descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >>based blocking shuffle don’t have the problem because for one result
>> >>partition, only one file is written at the same time.
>> >>2. *Performance*: Large amounts of small shuffle files and random IO
>> >>can influence shuffle performance a lot especially for hdd (for ssd,
>> >>sequential read is also important because of read ahead and cache).
>> For
>> >>batch jobs processing massive data, small amount of data per
>> subpartition
>> >>is common because of high parallelism. Besides, data skew is
>> another cause
>> >>of small subpartition files. By merging data of all subpartitions
>> together
>> >>in one file, more sequential read can be achieved.
>> >>3. *Resource*: For current hash-based implementation, each
>> >>subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >>memory consumption can be huge. For example, we need at least 320M
>> network
>> >>memory per result partition if parallelism is set to 1 and
>> because of
>> >>the huge network consumption, it is hard to config the network
>> memory for
>> >>large scale batch job and  sometimes parallelism can not be
>> increased just
>> >>because of insufficient network memory  which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>


How to unsubscribe?

2021-06-08 Thread Geldenhuys , Morgan Karl
How can I unsubscribe to this mailing lists? The volume of is just getting too 
much at the moment. Following the steps described in the website 
(https://flink.apache.org/community.html) did not appear to do anything.

Sorry for the spam and thanks in advance.


Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-08 Thread Chirag Dewan
 Hi,
Although this looks like a problem to me, I still cant conclude it. 
I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:
Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.
Any more leads?
Thanks,Chirag

On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote:  
 
  Hi,
I think I got my issue. Would help if someone can confirm it :)
I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 
All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.
Does this lead to state corruption?
Thanks,Chirag


On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote:  
 
  Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and 
works fine otherwise.
We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.

atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)

at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)

atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)

at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)

atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)

Any leads would be appreciated. Thanks
Chirag



  

Re: State migration for sql job

2021-06-08 Thread Kurt Young
What kind of expectation do you have after you add the "max(a)" aggregation:

a. Keep summing a and start to calculate max(a) after you added. In other
words, max(a) won't take the history data into account.
b. First process all the historical data to get a result of max(a), and
then start to compute sum(a) and max(a) together for the real-time data.

Best,
Kurt


On Tue, Jun 8, 2021 at 2:11 PM JING ZHANG  wrote:

> Hi aitozi,
> This is a popular demand that many users mentioned, which appears in user
> mail list for several times.
> Unfortunately, it is not supported by Flink SQL yet, maybe would be solved
> in the future. BTW, a few company try to solve the problem in some
> specified user cases on their internal Flink version[2].
> Currently, you may try use `State Processor API`[1] as temporary solution.
> 1. Do a savepoint
> 2. Generates updated the savepoint based on State Processor API
> 3. Recover from the new savepoint.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
> [2] https://developer.aliyun.com/article/781455
>
> Best regards,
> JING ZHANG
>
> aitozi  于2021年6月8日周二 下午1:54写道:
>
>> When use flink sql, we encounter a big problem to deal with sql state
>> compatibility. Think we have a group agg sql like ```sql select sum(`a`)
>> from source_t group by `uid` ``` But if i want to add a new agg column to
>> ```sql select sum(`a`), max(`a`) from source_t group by `uid` ``` Then sql
>> state will not be compatible. Is there any on-going work/thoughts to
>> improve this situation?
>> --
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> 
>> at Nabble.com.
>>
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:

> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details.  FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
> >>current hash-based blocking shuffle implementation writes too many
> files
> >>concurrently which gives high pressure to the file system, for
> example,
> >>maintenance of too many file metas, exhaustion of inodes or file
> >>descriptors. All of these can be potential stability issues.
> Sort-Merge
> >>based blocking shuffle don’t have the problem because for one result
> >>partition, only one file is written at the same time.
> >>2. *Performance*: Large amounts of small shuffle files and random IO
> >>can influence shuffle performance a lot especially for hdd (for ssd,
> >>sequential read is also important because of read ahead and cache).
> For
> >>batch jobs processing massive data, small amount of data per
> subpartition
> >>is common because of high parallelism. Besides, data skew is another
> cause
> >>of small subpartition files. By merging data of all subpartitions
> together
> >>in one file, more sequential read can be achieved.
> >>3. *Resource*: For current hash-based implementation, each
> >>subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >>memory consumption can be huge. For example, we need at least 320M
> network
> >>memory per result partition if parallelism is set to 1 and
> because of
> >>the huge network consumption, it is hard to config the network
> memory for
> >>large scale batch job and  sometimes parallelism can not be
> increased just
> >>because of insufficient network memory  which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>


Re: ByteSerializationSchema in PyFlink

2021-06-08 Thread Wouter Zorgdrager
Hi Dian, all,

The way I resolved right now, is to write my own custom serializer which
only maps from bytes to bytes. See the code below:
public class KafkaBytesSerializer implements SerializationSchema,
DeserializationSchema {

@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}

@Override
public boolean isEndOfStream(byte[] bytes) {
return false;
}

@Override
public byte[] serialize(byte[] bytes) {
return bytes;
}

@Override
public TypeInformation getProducedType() {
return TypeInformation.of(byte[].class);
}
}

This code is packaged in a jar and uploaded through env.add_jars. That
works like a charm!

Thanks for the help!
Wouter

On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager 
wrote:

> Hi Dian, all,
>
> Thanks for your suggestion. Unfortunately, it does not seem to work. I get
> the following exception:
>
> Caused by: java.lang.NegativeArraySizeException: -2147183315
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
> at
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
> To be more precise, the messages in my Kafka topic are pickled Python
> objects. Maybe that is the reason for the exception, I also tried using 
> Types.PICKLED_BYTE_ARRAY().get_java_type_info()
> but I think that has the same serializer because I get the same exception.
>
> Any suggestions? Thanks for your help!
>
> Regards,
> Wouter
>
> On Fri, 4 Jun 2021 at 08:24, Dian Fu  wrote:
>
>> Hi Wouter,
>>
>> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
>> Constructor 
>> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
>> org.apache.flink.configuration.Configuration]) does not exist
>>
>>
>> As the exception indicate, the constructor doesn’t exists.
>>
>>
>>
>> Could you try with the following:
>>
>> ```
>> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
>> j_type_serializer=
>>  
>> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
>>
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>>  j_type_serializer)
>>
>> ```
>>
>> Regards,
>> Dian
>>
>> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
>>
>> Hi all,
>>
>> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
>> directly work with the bytes from and to Kafka because I want to
>> serialize/deserialize in my Python code rather than the JVM environment.
>> Therefore, I can't use the SimpleStringSchema for (de)serialization (the
>> messages aren't strings anyways). I've tried to create a
>> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>>
>> class ByteSerializer(SerializationSchema, DeserializationSchema):
>> def __init__(self, execution_environment):
>> gate_way = get_gateway()
>>
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
>> Types.BYTE().get_java_type_info(),
>> get_j_env_configuration(execution_environment),
>> )
>> SerializationSchema.__init__(self, 
>> j_serialization_schema=j_byte_string_schema)
>> DeserializationSchema.__init__(
>> self, j_deserialization_schema=j_byte_string_schema
>> )The ByteSerializer is used like this:
>>
>>
>> return FlinkKafkaConsumer(
>> ["client_request", "internal"],
>> ByteSerializer(self.env._j_stream_execution_environment),
>> {
>> "bootstrap.servers": "localhost:9092",
>> "auto.offset.reset": "latest",

Re: Re: Add control mode for flink

2021-06-08 Thread 刘建刚
Thanks for the reply. It is a good question. There are multi choices as
follows:

   1. We can persist control signals in HighAvailabilityServices and replay
   them after failover.
   2. Only tell the users that the control signals take effect after they
   are checkpointed.


Steven Wu [via Apache Flink User Mailing List archive.] <
ml+s2336050n44278...@n4.nabble.com> 于2021年6月8日周二 下午2:15写道:

>
> I can see the benefits of control flow. E.g., it might help the old (and
> inactive) FLIP-17 side input. I would suggest that we add more details of
> some of the potential use cases.
>
> Here is one mismatch with using control flow for dynamic config. Dynamic
> config is typically targeted/loaded by one specific operator. Control flow
> will propagate the dynamic config to all operators. not a problem per se
>
> Regarding using the REST api (to jobmanager) for accepting control
> signals from external system, where are we going to persist/checkpoint the
> signal? jobmanager can die before the control signal is propagated and
> checkpointed. Did we lose the control signal in this case?
>
>
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]
> > wrote:
>
>> +1 on separating the effort into two steps:
>>
>>1. Introduce a common control flow framework, with flexible
>>interfaces for generating / reacting to control messages for various
>>purposes.
>>2. Features that leverating the control flow can be worked on
>>concurrently
>>
>> Meantime, keeping collecting potential features that may leverage the
>> control flow should be helpful. It provides good inputs for the control
>> flow framework design, to make the framework common enough to cover the
>> potential use cases.
>>
>> My suggestions on the next steps:
>>
>>1. Allow more time for opinions to be heard and potential use cases
>>to be collected
>>2. Draft a FLIP with the scope of common control flow framework
>>3. We probably need a poc implementation to make sure the framework
>>covers at least the following scenarios
>>   1. Produce control events from arbitrary operators
>>   2. Produce control events from JobMaster
>>   3. Consume control events from arbitrary operators downstream
>>   where the events are produced
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]
>> > wrote:
>>
>>> Very thanks Jiangang for bringing this up and very thanks for the
>>> discussion!
>>>
>>> I also agree with the summarization by Xintong and Jing that control
>>> flow seems to be
>>> a common buidling block for many functionalities and dynamic
>>> configuration framework
>>> is a representative application that frequently required by users.
>>> Regarding the control flow,
>>> currently we are also considering the design of iteration for the
>>> flink-ml, and as Xintong has pointed
>>> out, it also required the control flow in cases like detection global
>>> termination inside the iteration
>>>  (in this case we need to broadcast an event through the iteration body
>>> to detect if there are still
>>> records reside in the iteration body). And regarding  whether to
>>> implement the dynamic configuration
>>> framework, I also agree with Xintong that the consistency guarantee
>>> would be a point to consider, we
>>> might consider if we need to ensure every operator could receive the
>>> dynamic configuration.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> --
>>> Sender:kai wang<[hidden email]
>>> >
>>> Date:2021/06/08 11:52:12
>>> Recipient:JING ZHANG<[hidden email]
>>> >
>>> Cc:刘建刚<[hidden email]
>>> >; Xintong Song
>>> [via Apache Flink User Mailing List archive.]<[hidden email]
>>> >; user<[hidden
>>> email] >; dev<[hidden
>>> email] >
>>> Theme:Re: Add control mode for flink
>>>
>>>
>>>
>>> I'm big +1 for this feature.
>>>
>>>1. Limit the input qps.
>>>2. Change log level for debug.
>>>
>>> in my team, the two examples above are needed
>>>
>>> JING ZHANG <[hidden email]
>>> > 于2021年6月8日周二
>>> 上午11:18写道:
>>>
 Thanks Jiangang for bringing this up.
 As mentioned in Jiangang's email, `dynamic configuration framework`
 provides many useful functions in Kuaishou, because it could update job
 behavior without relaunching the job. The functions are very popular in
 Kuaishou, we also see similar demands in maillist [1].

 I'm big +1 for this feature.

 Thanks Xintong and Yun for deep thoughts about 

Re: Corrupted unaligned checkpoints in Flink 1.11.1

2021-06-08 Thread Piotr Nowojski
Re-adding user mailing list

Hey Alex,

In that case I can see two scenarios that could lead to missing files. Keep
in mind that incremental checkpoints are referencing previous checkpoints
in order to minimise the size of the checkpoint (roughly speaking only
changes since the previous checkpoint are being
persisted/uploaded/written). Checkpoint number 42, can reference an
arbitrary number of previous checkpoints. I suspect that somehow, some of
those previously referenced checkpoints got deleted and removed. Also keep
in mind that savepoints (as of now) are never incremental, they are always
full checkpoints. However externalised checkpoints can be incremental. Back
to the scenarios:
1. You might have accidentally removed some older checkpoints from your
Job2, maybe thinking they are no longer needed. Maybe you have just kept
this single externalised checkpoint directory from steps T3 or T4,
disregarding that this externalised checkpoint might be referencing
previous checkpoints of Job2?
2. As I mentioned, Flink is automatically maintaining reference counts of
the used files and deletes them when they are no longer used/referenced.
However this works only within a single job/cluster. For example if between
steps T3 and T4, you restarted Job2 and let it run for a bit, it could take
more checkpoints that would subsume files that were still part of the
externalised checkpoint that you previously used to start Job3/Job4. Job2
would have no idea that Job3/Job4 exist, let alone that they are
referencing some files from Job2, and those files could have been deleted
as soon as Job2 was no longer using/referencing them.

Could one of those happen in your case?

Best, Piotrek

pon., 7 cze 2021 o 20:01 Alexander Filipchik 
napisał(a):

> Yes, we do use incremental checkpoints.
>
> Alex
>
> On Mon, Jun 7, 2021 at 3:12 AM Piotr Nowojski 
> wrote:
>
>> Hi Alex,
>>
>> A quick question. Are you using incremental checkpoints?
>>
>> Best, Piotrek
>>
>> sob., 5 cze 2021 o 21:23  napisał(a):
>>
>>> Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save
>>> pointed).
>>>
>>> Thank you,
>>> Alex
>>>
>>> On Jun 4, 2021, at 3:07 PM, Alexander Filipchik 
>>> wrote:
>>>
>>> 
>>> Looked through the logs and didn't see anything fishy that indicated an
>>> exception during checkpointing.
>>> To make it clearer, here is the timeline (we use unaligned checkpoints,
>>> and state size around 300Gb):
>>>
>>> T1: Job1 was running
>>> T2: Job1 was savepointed, brought down and replaced with Job2.
>>> T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled,
>>> brought down and replaced by Job3 that was restored from extarnilized
>>> checkpoint of Job2
>>> T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled,
>>> brought down and replaced by Job4 that was restored from extarnilized
>>> checkpoint of Job3
>>> T4: We realized that jobs were timing out to savepoint due to local disk
>>> throttling. We provisioned disk with more throughput and IO. Job4 was
>>> cancelled, Job4 was deployed and restored from externilized checkpoint of
>>> Job3, but failed as it couldn't find some files in the folder that belongs
>>> to the checkpoint of *Job1*
>>> T5: We tried to redeploy and restore from checkpoints of Job3 and Job2,
>>> but all the attempts failed on reading files from the *folder that
>>> belongs to the checkpoint of Job1*
>>>
>>> We checked the content of the folder containing checkpoints of Job1, and
>>> it has files. Not sure what is pointing tho missing files and what could've
>>> removed them.
>>>
>>> Any way we can figure out what could've happened? Is there a tool that
>>> can read the checkpoint and check whether it is valid?
>>>
>>> Alex
>>>
>>> On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik 
>>> wrote:
>>>
 On the checkpoints -> what kind of issues should I check for? I was
 looking for metrics and it looks like they were reporting successful
 checkpoints. It looks like some files were removed in the shared folder,
 but I'm not sure how to check for what caused it.

 Savepoints were failing due to savepoint timeout timeout. Based on
 metrics, our attached disks were not fast enough (GCS regional disks are
 network disks and were throttled). The team cancelled the savepoint and
 just killed the kubernetes cluster. I assume some checkpoints were
 interrupted as the job triggers them one after another.

 Is there a known issue with termination during running checkpoint?

 Btw, we use the Flink Kube operator from Lyft.

 Alex

 On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler 
 wrote:

> Is there anything in the Flink logs indicating issues with writing the
> checkpoint data?
> When the savepoint could not be created, was anything logged from
> Flink? How did you shut down the cluster?
>
> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>
> Hi,
>
> Trying to figure ou