Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-04 Thread Becket Qin
Hi Sijie and Yijie,

Thanks for sharing your thoughts.

Just want to have some update on FLIP-27. Although the FLIP wiki and
discussion thread has been quiet for some time, a few committer /
contributors in Flink community were actually prototyping the entire thing.
We have made some good progress there but want to update the FLIP wiki
after the entire thing is verified to work in case there are some last
minute surprise in the implementation. I don't have an exact ETA yet, but I
guess it is going to be within a month or so.

I am happy to review the current Flink Pulsar connector and see if it would
fit in FLIP-27. It would be good to avoid the case that we checked in the
Pulsar connector with some review efforts and shortly after that the new
Source interface is ready.

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 5, 2019 at 8:39 AM Yijie Shen  wrote:

> Thanks for all the feedback and suggestions!
>
> As Sijie said, the goal of the connector has always been to provide
> users with the latest features of both systems as soon as possible. We
> propose to contribute the connector to Flink and hope to get more
> suggestions and feedback from Flink experts to ensure the high quality
> of the connector.
>
> For FLIP-27, we noticed its existence at the beginning of reworking
> the connector implementation based on Flink 1.9; we also wanted to
> build a connector that supports both batch and stream computing based
> on it.
> However, it has been inactive for some time, so we decided to provide
> a connector with most of the new features, such as the new type system
> and the new catalog API first. We will pay attention to the progress
> of FLIP-27 continually and incorporate it with the connector as soon
> as possible.
>
> Regarding the test status of the connector, we are following the other
> connectors' test in Flink repository and aimed to provide throughout
> tests as we could. We are also happy to hear suggestions and
> supervision from the Flink community to improve the stability and
> performance of the connector continuously.
>
> Best,
> Yijie
>
> On Thu, Sep 5, 2019 at 5:59 AM Sijie Guo  wrote:
> >
> > Thanks everyone for the comments and feedback.
> >
> > It seems to me that the main question here is about - "how can the Flink
> > community maintain the connector?".
> >
> > Here are two thoughts from myself.
> >
> > 1) I think how and where to host this integration is kind of less
> important
> > here. I believe there can be many ways to achieve it.
> > As part of the contribution, what we are looking for here is how these
> two
> > communities can build the collaboration relationship on developing
> > the integration between Pulsar and Flink. Even we can try our best to
> catch
> > up all the updates in Flink community. We are still
> > facing the fact that we have less experiences in Flink than folks in
> Flink
> > community. In order to make sure we maintain and deliver
> > a high-quality pulsar-flink integration to the users who use both
> > technologies, we need some help from the experts from Flink community.
> >
> > 2) We have been following FLIP-27 for a while. Originally we were
> thinking
> > of contributing the connectors back after integrating with the
> > new API introduced in FLIP-27. But we decided to initiate the
> conversation
> > as early as possible. Because we believe there are more benefits doing
> > it now rather than later. As part of contribution, it can help Flink
> > community understand more about Pulsar and the potential integration
> points.
> > Also we can also help Flink community verify the new connector API as
> well
> > as other new API (e.g. catalog API).
> >
> > Thanks,
> > Sijie
> >
> > On Wed, Sep 4, 2019 at 5:24 AM Becket Qin  wrote:
> >
> > > Hi Yijie,
> > >
> > > Thanks for the interest in contributing the Pulsar connector.
> > >
> > > In general, I think having Pulsar connector with strong support is a
> > > valuable addition to Flink. So I am happy the shepherd this effort.
> > > Meanwhile, I would also like to provide some context and recent
> efforts on
> > > the Flink connectors ecosystem.
> > >
> > > The current way Flink maintains its connector has hit the scalability
> bar.
> > > With more and more connectors coming into Flink repo, we are facing a
> few
> > > problems such as long build and testing time. To address this problem,
> we
> > > have attempted to do the following:
> > > 1. Split out the connectors into a separate repository. This is
> temporarily
> > > on hold due to potential solution to shorten the build time.
> > > 2. Encourage the connectors to stay as ecosystem project while Flink
> tries
> > > to provide good support for functionality and compatibility tests.
> Robert
> > > has driven to create a Flink Ecosystem project website and it is going
> > > through some final approval process.
> > >
> > > Given the above efforts, it would be great to first see if we can have
> > > Pulsar connector as an ecosystem project with great support. It

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-04 Thread Becket Qin
Hi Dawid,

Thanks a lot for the clarification. Got it now. A few more thoughts:

1. Naming.
I agree that if the name of "Configurable" is a little misleading if we
just want to use it to save POJOs. It would probably help to just name it
something like "ConfigPojo".

2. Flat config map v.s. structured config map.
>From user's perspective, I personally find a flat config map is usually
easier to understand than a structured config format. But it is just my
personal opinion and up for debate.

Taking the Host and CachedFile as examples, personally I think the
following format is more concise and user friendly:

Host: 192.168.0.1:1234 (single config)
Hosts: 192.168.0.1:1234, 192.168.0.2:5678 (list of configs)

CachedFile: path:file:flag (single config)
CachedFile: path1:file1:flag1, path2:file2:flag2 (list config)

Maybe for complicate POJOs the full K-V pair would be necessary, but it
looks we are trying to avoid such complicated POJOs to begin with. Even if
a full K-V is needed, a List> format would also be
almost equivalent to the current design.

3. The necessity of the POJO class in Configuration / ConfigOption system.
I can see the convenience of have a POJO (or ConfigObject) type supported
in the Configuration / ConfigOption. However, one thing to notice is that
API wise, the ConfigurableFactory can return arbitrary type of class
instead of just POJO. This can easily be misused or abused in cases such as
plugins. And the current API design will force such plugins to implement
methods like toConfiguration() which is a little awkward.

Given that 1) there will not be many such Pojo classes and 2) these POJO
classes are defined by Flink, I am thinking that one alternative approach
is to just have the constructors to take the configuration String (or list
of string) and parse that. This will avoid a few complications in this FLIP.
  a) No need to have the ConfigurableFactory class
  b) No need to have the toConfiguration() implementation. So there is just
one way to set values in the Configuration instance.
  c) The Configuration / ConfigOption does not have to also deal with the
Object creation. Instead, they will simply focus on configuration itself.

Thanks for the patient discussion. I don't want to block this FLIP further,
so I am fine to go with the current design with changing the name of
Configurable to something like ConfigPojo in order to avoid misuse as much
as possible.

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 4, 2019 at 5:50 PM Dawid Wysakowicz 
wrote:

> Hi Becket,
>
> You are right, that what we had in mind for
> ExecutionConfig/CheckpointConfig etc. is the option b) from your email.
> In the context of the FLIP-54, those objects are not Configurable. What
> we understood as a Configurable by the FLIP-54 are a simple pojos, that
> are stored under a single key. Such as the examples either from the ML
> thread (Host) or from the design doc (CacheFile). So when configuring
> the host user can provide a host like this:
>
> connector.host: address:localhost, port:1234
>
> rather than
>
> connector.host.address: localhost
>
> connector.host.port: 1234
>
> This is important especially if one wants to configure lists of such
> objects:
>
> connector.hosts: address:localhost,port:1234;address:localhost,port:4567
>
> The intention was definitely not to store whole complex objects, such as
> ExecutionConfig, CheckpointConfig etc. that contain multiple different
> options Maybe it makes sense to call it ConfigObject as Aljosha
> suggested? What do you think? Would that make it more understandable?
>
> For the initialization/configuration of objects such as ExecutionConfig,
> CheckpointConfig you may have a look at FLIP-59[1] where we suggest to
> add a configure method to those classes and we pretty much describe the
> process you outline in the last message.
>
> Best,
>
> Dawid
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-59%3A+Enable+execution+configuration+from+Configuration+object
>
> On 04/09/2019 03:37, Becket Qin wrote:
> > Hi Timo, Dawid and Aljoscha,
> >
> > Thanks for clarifying the goals. It is very helpful to understand the
> > motivation here. It would be great to add them to the FLIP wiki.
> >
> > I agree that the current FLIP design achieves the two goals it wants to
> > achieve. But I am trying to see is if the current approach is the most
> > reasonable approach.
> >
> > Please let me check if I understand this correctly. From end users'
> > perspective, they will do the following when they want to configure their
> > Flink Jobs.
> > 1. Create a Configuration instance, and call setters of Configuration
> with
> > the ConfigOptions defined in different components.
> > 2. The Configuration created in step 1 will be passed around, and each
> > component will just exact their own options from it.
> > 3. ExecutionConfig, CheckpointConfig (and other Config classes) will
> become
> > a Configurable, which is responsible for extracting the configuration
> > values fro

Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread vino yang
+1 (non-binding)

Zili Chen  于2019年9月5日周四 上午10:55写道:

> +1
>
>
> zhijiang  于2019年9月5日周四 上午12:36写道:
>
> > +1
> > --
> > From:Till Rohrmann 
> > Send Time:2019年9月4日(星期三) 13:39
> > To:dev 
> > Cc:Zhu Zhu 
> > Subject:Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy
> > configuration
> >
> > +1 (binding)
> >
> > On Wed, Sep 4, 2019 at 12:39 PM Chesnay Schepler 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > On 04/09/2019 11:13, Zhu Zhu wrote:
> > > > +1 (non-binding)
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > > > Till Rohrmann  于2019年9月4日周三 下午5:05写道:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> I would like to start the voting process for FLIP-61 [1], which is
> > > >> discussed and reached consensus in this thread [2].
> > > >>
> > > >> Since the change is rather small I'd like to shorten the voting
> period
> > > to
> > > >> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
> > > unless
> > > >> there is an objection or not enough votes.
> > > >>
> > > >> [1]
> > > >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
> > > >> [2]
> > > >>
> > > >>
> > >
> >
> https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > >
> > >
> >
> >
>


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Dawid Wysakowicz
Hi Xuefu,

Just wanted to summarize my opinion on the one topic (temporary functions).

My preference would be to make temporary functions always 3-part
qualified (as a result that would prohibit overriding built-in
functions). Having said that if the community decides that it's better
to allow overriding built-in functions I am fine with it and can commit
to that decision.

I wanted to ask if you could clarify a few points for me around that option.

 1. Would you enforce temporary functions to be always just a single
name (without db & cat) as hive does, or would you allow also 3 or
even 2 part identifiers?
 2. Assuming 2/3-part paths. How would you register a function from a
following statement: CREATE TEMPORARY FUNCTION db.func? Would that
shadow all functions named 'func' in all databases named 'db' in all
catalogs? Or would you shadow only function 'func' in database 'db'
in current catalog?
 3. This point is still under discussion, but was mentioned a few times,
that maybe we want to enable syntax cat.func for "external built-in
functions". How would that affect statement from previous point?
Would 'db.func' shadow "external built-in function" in 'db' catalog
or user functions as in point 2? Or maybe both?
 4. Lastly in fact to summarize the previous points. Assuming 2/3-part
paths. Would the function resolution be actually as follows?:
 1. temporary functions (1-part path)
 2. built-in functions
 3. temporary functions (2-part path)
 4. 2-part catalog functions a.k.a. "external built-in functions"
(cat + func) - this is still under discussion, if we want that
in the other focal point
 5. temporary functions (3-part path)
 6. 3-part catalog functions a.k.a. user functions

I would be really grateful if you could explain me those questions, thanks.

BTW, Thank you all for a healthy discussion.

Best,

Dawid

On 04/09/2019 23:25, Xuefu Z wrote:
> Thank all for the sharing thoughts. I think we have gathered some useful
> initial feedback from this long discussion with a couple of focal points
> sticking out.
>
>  We will go back to do more research and adapt our proposal. Once it's
> ready, we will ask for a new round of review. If there is any disagreement,
> we will start a new discussion thread on each rather than having a mega
> discussion like this.
>
> Thanks to everyone for participating.
>
> Regards,
> Xuefu
>
>
> On Thu, Sep 5, 2019 at 2:52 AM Bowen Li  wrote:
>
>> Let me try to summarize and conclude the long thread so far:
>>
>> 1. For order of temp function v.s. built-in function:
>>
>> I think Dawid's point that temp function should be of fully qualified path
>> is a better reasoning to back the newly proposed order, and i agree we
>> don't need to follow Hive/Spark.
>>
>> However, I'd rather not change fundamentals of temporary functions in this
>> FLIP. It belongs to a bigger story of how temporary objects should be
>> redefined and be handled uniformly - currently temporary tables and views
>> (those registered from TableEnv#registerTable()) behave different than what
>> Dawid propose for temp functions, and we need a FLIP to just unify their
>> APIs and behaviors.
>>
>> I agree that backward compatibility is not an issue w.r.t Jark's points.
>>
>> ***Seems we do have consensus that it's acceptable to prevent users
>> registering a temp function in the same name as a built-in function. To
>> help us move forward, I'd like to propose setting such a restraint on temp
>> functions in this FLIP to simplify the design and avoid disputes.*** It
>> will also leave rooms for improvements in the future.
>>
>>
>> 2. For Hive built-in function:
>>
>> Thanks Timo for providing the Presto and Postgres examples. I feel modular
>> built-in functions can be a good fit for the geo and ml example as a native
>> Flink extension, but not sure if it fits well with external integrations.
>> Anyway, I think modular built-in functions is a bigger story and can be on
>> its own thread too, and our proposal doesn't prevent Flink from doing that
>> in the future.
>>
>> ***Seems we have consensus that users should be able to use built-in
>> functions of Hive or other external systems in SQL explicitly and
>> deterministically regardless of Flink built-in functions and the potential
>> modular built-in functions, via some new syntax like "mycat::func"? If so,
>> I'd like to propose removing Hive built-in functions from ambiguous
>> function resolution order, and empower users with such a syntax. This way
>> we sacrifice a little convenience for certainty***
>>
>>
>> What do you think?
>>
>> On Wed, Sep 4, 2019 at 7:02 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi,
>>>
>>> Regarding the Hive & Spark support of TEMPORARY FUNCTIONS. I've just
>>> performed some experiments (hive-2.3.2 & spark 2.4.4) and I think they
>> are
>>> very inconsistent in that manner (spark being way worse on that).
>>>
>>> Hive:
>>>
>>> You cannot overwrite all th

Re: Re: [DISCUSS] Support JSON functions in Flink SQL

2019-09-04 Thread TANG Wen-hui
+1 
I have done similar work before. 
Looking forward to discussing this feature.

Best
wenhui



winifred.wenhui.t...@gmail.com
 
From: Kurt Young
Date: 2019-09-05 14:00
To: dev
CC: Anyang Hu
Subject: Re: [DISCUSS] Support JSON functions in Flink SQL
+1 to add JSON support to Flink. We also see lots of requirements for JSON
related functions in our internal platform. Since these are already SQL
standard, I think it's a good time to add them to Flink.
 
Best,
Kurt
 
 
On Thu, Sep 5, 2019 at 10:37 AM Qi Luo  wrote:
 
> We also see strong demands from our SQL users for JSON/Date related
> functions.
>
> Also +Anyang Hu 
>
> On Wed, Sep 4, 2019 at 9:51 PM Jark Wu  wrote:
>
> > Hi Forward,
> >
> > Thanks for bringing this discussion and preparing the nice design.
> > I think it's nice to have the JSON functions in the next release.
> > We have received some requirements for this feature.
> >
> > I can help to shepherd this JSON functions effort and will leave comments
> >  in the design doc in the next days.
> >
> > Hi Danny,
> >
> > The new introduced JSON functions are from SQL:2016, not from MySQL.
> > So there no JSON type is needed. According to the SQL:2016, the
> > representation of JSON data can be "character string" which is also
> > the current implementation in Calcite[1].
> >
> > Best,
> > Jark
> >
> >
> > [1]: https://calcite.apache.org/docs/reference.html#json-functions
> >
> >
> > On Wed, 4 Sep 2019 at 21:22, Xu Forward  wrote:
> >
> > > hi Danny Chan ,Thank you very much for your reply, your help can help
> me
> > > further improve this discussion.
> > > Best
> > > forward
> > >
> > > Danny Chan  于2019年9月4日周三 下午8:50写道:
> > >
> > > > Thanks Xu Forward for bring up this topic, I think the JSON functions
> > are
> > > > very useful especially for those MySQL users.
> > > >
> > > > I saw that you have done some work within the Apache Calcite, that’s
> a
> > > > good start, but this is one concern from me, Flink doesn’t support
> JSON
> > > > type internal, so how to represent a JSON object in Flink maybe a key
> > > point
> > > > we need to resolve. In Calcite, we use ANY type to represent as the
> > JSON,
> > > > but I don’t think it is the right way to go, maybe we can have a
> > > discussion
> > > > here.
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2019年9月4日 +0800 PM8:34,Xu Forward ,写道:
> > > > > Hi everybody,
> > > > >
> > > > > I'd like to kick off a discussion on Support JSON functions in
> Flink
> > > SQL.
> > > > >
> > > > > The entire plan is divided into two steps:
> > > > > 1. Implement Support SQL 2016-2017 JSON functions in Flink SQL[1].
> > > > > 2. Implement non-Support SQL 2016-2017 JSON functions in Flink SQL,
> > > such
> > > > as
> > > > > JSON_TYPE in Mysql, JSON_LENGTH, etc. Very useful JSON functions.
> > > > >
> > > > > Would love to hear your thoughts.
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp
> > > > >
> > > > > Best,
> > > > > ForwardXu
> > > >
> > >
> >
>


Re: [DISCUSS] Support JSON functions in Flink SQL

2019-09-04 Thread Kurt Young
+1 to add JSON support to Flink. We also see lots of requirements for JSON
related functions in our internal platform. Since these are already SQL
standard, I think it's a good time to add them to Flink.

Best,
Kurt


On Thu, Sep 5, 2019 at 10:37 AM Qi Luo  wrote:

> We also see strong demands from our SQL users for JSON/Date related
> functions.
>
> Also +Anyang Hu 
>
> On Wed, Sep 4, 2019 at 9:51 PM Jark Wu  wrote:
>
> > Hi Forward,
> >
> > Thanks for bringing this discussion and preparing the nice design.
> > I think it's nice to have the JSON functions in the next release.
> > We have received some requirements for this feature.
> >
> > I can help to shepherd this JSON functions effort and will leave comments
> >  in the design doc in the next days.
> >
> > Hi Danny,
> >
> > The new introduced JSON functions are from SQL:2016, not from MySQL.
> > So there no JSON type is needed. According to the SQL:2016, the
> > representation of JSON data can be "character string" which is also
> > the current implementation in Calcite[1].
> >
> > Best,
> > Jark
> >
> >
> > [1]: https://calcite.apache.org/docs/reference.html#json-functions
> >
> >
> > On Wed, 4 Sep 2019 at 21:22, Xu Forward  wrote:
> >
> > > hi Danny Chan ,Thank you very much for your reply, your help can help
> me
> > > further improve this discussion.
> > > Best
> > > forward
> > >
> > > Danny Chan  于2019年9月4日周三 下午8:50写道:
> > >
> > > > Thanks Xu Forward for bring up this topic, I think the JSON functions
> > are
> > > > very useful especially for those MySQL users.
> > > >
> > > > I saw that you have done some work within the Apache Calcite, that’s
> a
> > > > good start, but this is one concern from me, Flink doesn’t support
> JSON
> > > > type internal, so how to represent a JSON object in Flink maybe a key
> > > point
> > > > we need to resolve. In Calcite, we use ANY type to represent as the
> > JSON,
> > > > but I don’t think it is the right way to go, maybe we can have a
> > > discussion
> > > > here.
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2019年9月4日 +0800 PM8:34,Xu Forward ,写道:
> > > > > Hi everybody,
> > > > >
> > > > > I'd like to kick off a discussion on Support JSON functions in
> Flink
> > > SQL.
> > > > >
> > > > > The entire plan is divided into two steps:
> > > > > 1. Implement Support SQL 2016-2017 JSON functions in Flink SQL[1].
> > > > > 2. Implement non-Support SQL 2016-2017 JSON functions in Flink SQL,
> > > such
> > > > as
> > > > > JSON_TYPE in Mysql, JSON_LENGTH, etc. Very useful JSON functions.
> > > > >
> > > > > Would love to hear your thoughts.
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp
> > > > >
> > > > > Best,
> > > > > ForwardXu
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Bowen Li
Maybe Xuefu missed my email. Please let me know what your thoughts are on
the summary, if there's still major controversy, I can take time to
reevaluate that part.


On Wed, Sep 4, 2019 at 2:25 PM Xuefu Z  wrote:

> Thank all for the sharing thoughts. I think we have gathered some useful
> initial feedback from this long discussion with a couple of focal points
> sticking out.
>
>  We will go back to do more research and adapt our proposal. Once it's
> ready, we will ask for a new round of review. If there is any disagreement,
> we will start a new discussion thread on each rather than having a mega
> discussion like this.
>
> Thanks to everyone for participating.
>
> Regards,
> Xuefu
>
>
> On Thu, Sep 5, 2019 at 2:52 AM Bowen Li  wrote:
>
> > Let me try to summarize and conclude the long thread so far:
> >
> > 1. For order of temp function v.s. built-in function:
> >
> > I think Dawid's point that temp function should be of fully qualified
> path
> > is a better reasoning to back the newly proposed order, and i agree we
> > don't need to follow Hive/Spark.
> >
> > However, I'd rather not change fundamentals of temporary functions in
> this
> > FLIP. It belongs to a bigger story of how temporary objects should be
> > redefined and be handled uniformly - currently temporary tables and views
> > (those registered from TableEnv#registerTable()) behave different than
> what
> > Dawid propose for temp functions, and we need a FLIP to just unify their
> > APIs and behaviors.
> >
> > I agree that backward compatibility is not an issue w.r.t Jark's points.
> >
> > ***Seems we do have consensus that it's acceptable to prevent users
> > registering a temp function in the same name as a built-in function. To
> > help us move forward, I'd like to propose setting such a restraint on
> temp
> > functions in this FLIP to simplify the design and avoid disputes.*** It
> > will also leave rooms for improvements in the future.
> >
> >
> > 2. For Hive built-in function:
> >
> > Thanks Timo for providing the Presto and Postgres examples. I feel
> modular
> > built-in functions can be a good fit for the geo and ml example as a
> native
> > Flink extension, but not sure if it fits well with external integrations.
> > Anyway, I think modular built-in functions is a bigger story and can be
> on
> > its own thread too, and our proposal doesn't prevent Flink from doing
> that
> > in the future.
> >
> > ***Seems we have consensus that users should be able to use built-in
> > functions of Hive or other external systems in SQL explicitly and
> > deterministically regardless of Flink built-in functions and the
> potential
> > modular built-in functions, via some new syntax like "mycat::func"? If
> so,
> > I'd like to propose removing Hive built-in functions from ambiguous
> > function resolution order, and empower users with such a syntax. This way
> > we sacrifice a little convenience for certainty***
> >
> >
> > What do you think?
> >
> > On Wed, Sep 4, 2019 at 7:02 AM Dawid Wysakowicz 
> > wrote:
> >
> > > Hi,
> > >
> > > Regarding the Hive & Spark support of TEMPORARY FUNCTIONS. I've just
> > > performed some experiments (hive-2.3.2 & spark 2.4.4) and I think they
> > are
> > > very inconsistent in that manner (spark being way worse on that).
> > >
> > > Hive:
> > >
> > > You cannot overwrite all the built-in functions. I could overwrite most
> > of
> > > the functions I tried e.g. length, e, pi, round, rtrim, but there are
> > > functions I cannot overwrite e.g. CAST, ARRAY I get:
> > >
> > >
> > > *ParseException line 1:29 cannot recognize input near 'array' 'AS'
> *
> > >
> > > What is interesting is that I cannot ovewrite *array*, but I can
> ovewrite
> > > *map* or *struct*. Though hive behaves reasonable well if I manage to
> > > overwrite a function. When I drop the temporary function the native
> > > function is still available.
> > >
> > > Spark:
> > >
> > > Spark's behavior imho is super bad.
> > >
> > > Theoretically I could overwrite all functions. I was able e.g. to
> > > overwrite CAST function. I had to use though CREATE OR REPLACE
> TEMPORARY
> > > FUNCTION syntax. Otherwise I get an exception that a function already
> > > exists. However when I used the CAST function in a query it used the
> > > native, built-in one.
> > >
> > > When I overwrote current_date() function, it was used in a query, but
> it
> > > completely replaces the built-in function and I can no longer use the
> > > native function in any way. I cannot also drop the temporary function.
> I
> > > get:
> > >
> > > *Error in query: Cannot drop native function 'current_date';*
> > >
> > > Additional note, both systems do not allow creating TEMPORARY FUNCTIONS
> > > with a database. Temporary functions are always represented as a single
> > > name.
> > >
> > > In my opinion neither of the systems have consistent behavior.
> Generally
> > > speaking I think overwriting any system provided functions is just
> > > dangerous.
> > >
> > > Regarding Ja

[jira] [Created] (FLINK-13965) Keep hasDeprecatedKeys in ConfigOption and mark it with @Deprecated annotation

2019-09-04 Thread vinoyang (Jira)
vinoyang created FLINK-13965:


 Summary: Keep hasDeprecatedKeys in ConfigOption and mark it with 
@Deprecated annotation
 Key: FLINK-13965
 URL: https://issues.apache.org/jira/browse/FLINK-13965
 Project: Flink
  Issue Type: Test
Affects Versions: 1.9.0
Reporter: vinoyang


In our program based on Flink 1.7.2, we used method 
{{ConfigOption#hasDeprecatedKeys}}. But, this method was renamed to 
{{hasFallbackKeys }} in FLINK-10436. So after we bump our flink version to 
1.9.0, we meet compile error.

 
It seems we replaced the deprecated key with an entity {{FallbackKey}}. 
However, I still see the method {{withDeprecatedKeys}}. Since we keep the 
method {{withDeprecatedKeys}}, why not keep the method {{hasDeprecatedKeys}}. 
Although, this public API did not marked as {{Public}} annotation. IMHO, 
{{ConfigOption}} is hosted in flink-core module, many users also use it, we 
should maintain the compatibility as far as possible.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: Storing offsets in Kafka

2019-09-04 Thread Becket Qin
Hi Dominik,

There has not been any change to the offset committing logic in
KafkaConsumer for a while. But the logic is a little complicated. The
offset commit to Kafka is only enabled in the following two cases:

1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true
(default value is true)
2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a)
enable.auto.commit=true (default value is true); b) auto.commit.interval.ms>0
(default value is 5000).

Note that in case 1, if the job exits before the first checkpoint takes
place, then there will be no offset committed.

Can you check if your setting falls in one of the two cases?

Thanks,

Jiangjie (Becket) Qin




On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński  wrote:

> Hey,
> I was wondering whether something has changed for KafkaConsumer, since I am
> using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
> seems to be no change in the topic where Kafka stores it's offsets, after
> restart Flink uses the `auto.offset.reset` so it seems that there is no
> offsets commit happening. The checkpoints are properly configured and I am
> able to restore with Savepoint. But the group offsets are not working
> properly. It there anything that has changed in this manner ?
>
> Best Regards,
> Dom.
>


Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-04 Thread jincheng sun
Hi Aljoscha,

That's a good points, so far, most of the code will live in flink-python
module, and the rules and relNodes will be put into the both blink and
flink planner modules, some of the common interface of required by planners
will be placed in flink-table-common. I think you are right, we should try
to ensure the changes of this feature is minimal.  For more detail we would
follow this principle when review the PRs.

Great thanks for your questions and remind!

Best,
Jincheng


Aljoscha Krettek  于2019年9月4日周三 下午8:58写道:

> Hi,
>
> Things looks interesting so far!
>
> I had one question: Where will most of the support code for this live?
> Will this add the required code to flink-table-common or the different
> runners? Can we implement this in such a way that only a minimal amount of
> support code is required in the parts of the Table API (and Table API
> runners) that  are not python specific?
>
> Best,
> Aljoscha
>
> > On 4. Sep 2019, at 14:14, Timo Walther  wrote:
> >
> > Hi Jincheng,
> >
> > 2. Serializability of functions: "#2 is very convenient for users" means
> only until they have the first backwards-compatibility issue, after that
> they will find it not so convinient anymore and will ask why the framework
> allowed storing such objects in a persistent storage. I don't want to be
> picky about it, but wanted to raise awareness that sometimes it is ok to
> limit use cases to guide users for devloping backwards-compatible programs.
> >
> > Thanks for the explanation fo the remaining items. It sounds reasonable
> to me. Regarding the example with `getKind()`, I actually meant
> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
> users to override this property. And I think we should do something similar
> for the getLanguage property.
> >
> > Thanks,
> > Timo
> >
> > On 03.09.19 15:01, jincheng sun wrote:
> >> Hi Timo,
> >>
> >> Thanks for the quick reply ! :)
> >> I have added more example for #3 and #5 to the FLIP. That are great
> >> suggestions !
> >>
> >> Regarding 2:
> >>
> >> There are two kind Serialization for CloudPickle(Which is different from
> >> Java):
> >>  1) For class and function which can be imported, CloudPickle only
> >> serialize the full path of the class and function (just like java class
> >> name).
> >>  2) For the class and function which can not be imported, CloudPickle
> will
> >> serialize the full content of the class and function.
> >> For #2, It means that we can not just store the full path of the class
> and
> >> function.
> >>
> >> The above serialization is recursive.
> >>
> >> However, there is indeed an problem of backwards compatibility when the
> >> module path of the parent class changed. But I think this is an rare
> case
> >> and acceptable. i.e., For Flink framework we never change the user
> >> interface module path if we want to keep backwards compatibility. For
> user
> >> code, if they change the interface of UDF's parent, they should
> re-register
> >> their functions.
> >>
> >> If we do not want support #2, we can store the full path of class and
> >> function, in that case we have no backwards compatibility problem. But I
> >> think the #2 is very convenient for users.
> >>
> >> What do you think?
> >>
> >> Regarding 4:
> >> As I mentioned earlier, there may be built-in Python functions and I
> think
> >> language is a "function" concept. Function and Language are orthogonal
> >> concepts.
> >> We may have R, GO and other language functions in the future, not only
> >> user-defined, but also built-in functions.
> >>
> >> You are right that users will not set this method and for Python
> functions,
> >> it will be set in the code-generated Java function by the framework.
> So, I
> >> think we should declare the getLanguage() in FunctionDefinition for now.
> >> (I'm not pretty sure what do you mean by saying that getKind() is final
> in
> >> UserDefinedFunction?)
> >>
> >> Best,
> >> Jincheng
> >>
> >> Timo Walther  于2019年9月3日周二 下午6:01写道:
> >>
> >>> Hi Jincheng,
> >>>
> >>> thanks for your response.
> >>>
> >>> 2. Serializability of functions: Using some arbitrary serialization
> >>> format for shipping a function to worker sounds fine to me. But once we
> >>> store functions a the catalog we need to think about backwards
> >>> compatibility and evolution of interfaces etc. I'm not sure if
> >>> CloudPickle is the right long-term storage format for this. If we don't
> >>> think about this in advance, we are basically violating our code
> quality
> >>> guide [1] of never use Java Serialization but in the Python-way. We are
> >>> using the RPC serialization for persistence.
> >>>
> >>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
> >>> code like the following is not covered there:
> >>>
> >>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
> >>> DataTypes.BIGINT(),
> >>>  DataTypes.BIGINT()))
> >>> self.t_env.register_function("subtract_one"

Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread Zili Chen
+1


zhijiang  于2019年9月5日周四 上午12:36写道:

> +1
> --
> From:Till Rohrmann 
> Send Time:2019年9月4日(星期三) 13:39
> To:dev 
> Cc:Zhu Zhu 
> Subject:Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy
> configuration
>
> +1 (binding)
>
> On Wed, Sep 4, 2019 at 12:39 PM Chesnay Schepler 
> wrote:
>
> > +1 (binding)
> >
> > On 04/09/2019 11:13, Zhu Zhu wrote:
> > > +1 (non-binding)
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Till Rohrmann  于2019年9月4日周三 下午5:05写道:
> > >
> > >> Hi everyone,
> > >>
> > >> I would like to start the voting process for FLIP-61 [1], which is
> > >> discussed and reached consensus in this thread [2].
> > >>
> > >> Since the change is rather small I'd like to shorten the voting period
> > to
> > >> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
> > unless
> > >> there is an objection or not enough votes.
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
> > >> [2]
> > >>
> > >>
> >
> https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E
> > >>
> > >> Cheers,
> > >> Till
> > >>
> >
> >
>
>


Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-04 Thread jincheng sun
Hi Timo,

Yes, I think about convenient is a trade-off, for now think convenient
compare with always do right thing I prefer add some limitation ensure user
never encounter issues. So, #2 we are on the same page now.

Best,
Jincheng

Aljoscha Krettek  于2019年9月4日周三 下午8:58写道:

> Hi,
>
> Things looks interesting so far!
>
> I had one question: Where will most of the support code for this live?
> Will this add the required code to flink-table-common or the different
> runners? Can we implement this in such a way that only a minimal amount of
> support code is required in the parts of the Table API (and Table API
> runners) that  are not python specific?
>
> Best,
> Aljoscha
>
> > On 4. Sep 2019, at 14:14, Timo Walther  wrote:
> >
> > Hi Jincheng,
> >
> > 2. Serializability of functions: "#2 is very convenient for users" means
> only until they have the first backwards-compatibility issue, after that
> they will find it not so convinient anymore and will ask why the framework
> allowed storing such objects in a persistent storage. I don't want to be
> picky about it, but wanted to raise awareness that sometimes it is ok to
> limit use cases to guide users for devloping backwards-compatible programs.
> >
> > Thanks for the explanation fo the remaining items. It sounds reasonable
> to me. Regarding the example with `getKind()`, I actually meant
> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
> users to override this property. And I think we should do something similar
> for the getLanguage property.
> >
> > Thanks,
> > Timo
> >
> > On 03.09.19 15:01, jincheng sun wrote:
> >> Hi Timo,
> >>
> >> Thanks for the quick reply ! :)
> >> I have added more example for #3 and #5 to the FLIP. That are great
> >> suggestions !
> >>
> >> Regarding 2:
> >>
> >> There are two kind Serialization for CloudPickle(Which is different from
> >> Java):
> >>  1) For class and function which can be imported, CloudPickle only
> >> serialize the full path of the class and function (just like java class
> >> name).
> >>  2) For the class and function which can not be imported, CloudPickle
> will
> >> serialize the full content of the class and function.
> >> For #2, It means that we can not just store the full path of the class
> and
> >> function.
> >>
> >> The above serialization is recursive.
> >>
> >> However, there is indeed an problem of backwards compatibility when the
> >> module path of the parent class changed. But I think this is an rare
> case
> >> and acceptable. i.e., For Flink framework we never change the user
> >> interface module path if we want to keep backwards compatibility. For
> user
> >> code, if they change the interface of UDF's parent, they should
> re-register
> >> their functions.
> >>
> >> If we do not want support #2, we can store the full path of class and
> >> function, in that case we have no backwards compatibility problem. But I
> >> think the #2 is very convenient for users.
> >>
> >> What do you think?
> >>
> >> Regarding 4:
> >> As I mentioned earlier, there may be built-in Python functions and I
> think
> >> language is a "function" concept. Function and Language are orthogonal
> >> concepts.
> >> We may have R, GO and other language functions in the future, not only
> >> user-defined, but also built-in functions.
> >>
> >> You are right that users will not set this method and for Python
> functions,
> >> it will be set in the code-generated Java function by the framework.
> So, I
> >> think we should declare the getLanguage() in FunctionDefinition for now.
> >> (I'm not pretty sure what do you mean by saying that getKind() is final
> in
> >> UserDefinedFunction?)
> >>
> >> Best,
> >> Jincheng
> >>
> >> Timo Walther  于2019年9月3日周二 下午6:01写道:
> >>
> >>> Hi Jincheng,
> >>>
> >>> thanks for your response.
> >>>
> >>> 2. Serializability of functions: Using some arbitrary serialization
> >>> format for shipping a function to worker sounds fine to me. But once we
> >>> store functions a the catalog we need to think about backwards
> >>> compatibility and evolution of interfaces etc. I'm not sure if
> >>> CloudPickle is the right long-term storage format for this. If we don't
> >>> think about this in advance, we are basically violating our code
> quality
> >>> guide [1] of never use Java Serialization but in the Python-way. We are
> >>> using the RPC serialization for persistence.
> >>>
> >>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
> >>> code like the following is not covered there:
> >>>
> >>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
> >>> DataTypes.BIGINT(),
> >>>  DataTypes.BIGINT()))
> >>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
> >>> DataTypes.BIGINT(),
> >>> DataTypes.BIGINT()))
> >>> self.t_env.register_function("add", add)
> >>>
> >>> 4. FunctionDefinition: Your response still doesn't answer my question
> >>> entirely. Why do we need FunctionDefinition.getL

Re: instable checkpointing after migration to flink 1.8

2019-09-04 Thread Congxian Qiu
Another information from our private emails

there ALWAYS have Kafka AbstractCoordinator logs about lost connection to
Kafka at the same time we have the checkpoints confirmed. Bekir checked the
Kafka broker log, but did not find any interesting things there.

Best,
Congxian


Congxian Qiu  于2019年9月5日周四 上午10:26写道:

> Hi Bekir,
>
> If it is the storage place for timers, for RocksDBStateBackend, timers can
> be stored in Heap or RocksDB[1][2]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年9月4日周三 下午11:38写道:
>
>> Hi Stephan,
>> sorry for late response.
>> We indeed use timers inside a KeyedProcessFunction but the triggers of
>> the timers are kinda evenly distributed, so not causing a firing storm.
>> We have a custom ttl logic which is used by the deletion timer to decide
>> whether delete a record from inmemory state or not.
>> Can you maybe give some links to those changes in the RocksDB?
>>
>> Thanks in advance,
>> Bekir Oguz
>>
>> On Fri, 30 Aug 2019 at 14:56, Stephan Ewen  wrote:
>>
>>> Hi all!
>>>
>>> A thought would be that this has something to do with timers. Does the
>>> task with that behavior use timers (windows, or process function)?
>>>
>>> If that is the case, some theories to check:
>>>   - Could it be a timer firing storm coinciding with a checkpoint?
>>> Currently, that storm synchronously fires, checkpoints cannot preempt that,
>>> which should change in 1.10 with the new mailbox model.
>>>   - Could the timer-async checkpointing changes have something to do
>>> with that? Does some of the usually small "preparation work" (happening
>>> synchronously) lead to an unwanted effect?
>>>   - Are you using TTL for state in that operator?
>>>   - There were some changes made to support timers in RocksDB recently.
>>> Could that have something to do with it?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu 
>>> wrote:
>>>
 CC flink dev mail list
 Update for those who may be interested in this issue, we'are still
 diagnosing this problem currently.

 Best,
 Congxian


 Congxian Qiu  于2019年8月29日周四 下午8:58写道:

 > Hi Bekir
 >
 > Currently, from what we have diagnosed, there is some task complete
 its
 > checkpoint too late (maybe 15 mins), but we checked the kafka broker
 log
 > and did not find any interesting things there. could we run another
 job,
 > that did not commit offset to kafka, this wants to check if it is the
 > "commit offset to kafka" step consumes too much time.
 >
 > Best,
 > Congxian
 >
 >
 > Bekir Oguz  于2019年8月28日周三 下午4:19写道:
 >
 >> Hi Congxian,
 >> sorry for the late reply, but no progress on this issue yet. I
 checked
 >> also the kafka broker logs, found nothing interesting there.
 >> And we still have 15 min duration checkpoints quite often. Any more
 ideas
 >> on where to look at?
 >>
 >> Regards,
 >> Bekir
 >>
 >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
 >> wrote:
 >>
 >>> Hi Bekir
 >>>
 >>> Do you come back to work now, does there any more findings of this
 >>> problem?
 >>>
 >>> Best,
 >>> Congxian
 >>>
 >>>
 >>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
 >>>
  Hi Congxian,
  Thanks for following up this issue. It is still unresolved and I
 am on
  vacation at the moment.  Hopefully my collegues Niels and Vlad can
 spare
  some time to look into this.
 
  @Niels, @Vlad: do you guys also think that this issue might be
 Kafka
  related? We could also check kafka broker logs at the time of long
  checkpointing.
 
  Thanks,
  Bekir
 
  Verstuurd vanaf mijn iPhone
 
  Op 12 aug. 2019 om 15:18 heeft Congxian Qiu <
 qcx978132...@gmail.com>
  het volgende geschreven:
 
  Hi Bekir
 
  Is there any progress about this problem?
 
  Best,
  Congxian
 
 
  Congxian Qiu  于2019年8月8日周四 下午11:17写道:
 
 > hi Bekir
 > Thanks for the information.
 >
 > - Source's checkpoint was triggered by RPC calls, so it has the
 > "Trigger checkpoint xxx" log,
 > - other task's checkpoint was triggered after received all the
 barrier
 > of upstreams, it didn't log the "Trigger checkpoint XXX" :(
 >
 > Your diagnose makes sense to me, we need to check the Kafka log.
 > I also find out that we always have a log like
 > "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 Marking
 > the coordinator 

Re: [DISCUSS] Support JSON functions in Flink SQL

2019-09-04 Thread Qi Luo
We also see strong demands from our SQL users for JSON/Date related
functions.

Also +Anyang Hu 

On Wed, Sep 4, 2019 at 9:51 PM Jark Wu  wrote:

> Hi Forward,
>
> Thanks for bringing this discussion and preparing the nice design.
> I think it's nice to have the JSON functions in the next release.
> We have received some requirements for this feature.
>
> I can help to shepherd this JSON functions effort and will leave comments
>  in the design doc in the next days.
>
> Hi Danny,
>
> The new introduced JSON functions are from SQL:2016, not from MySQL.
> So there no JSON type is needed. According to the SQL:2016, the
> representation of JSON data can be "character string" which is also
> the current implementation in Calcite[1].
>
> Best,
> Jark
>
>
> [1]: https://calcite.apache.org/docs/reference.html#json-functions
>
>
> On Wed, 4 Sep 2019 at 21:22, Xu Forward  wrote:
>
> > hi Danny Chan ,Thank you very much for your reply, your help can help me
> > further improve this discussion.
> > Best
> > forward
> >
> > Danny Chan  于2019年9月4日周三 下午8:50写道:
> >
> > > Thanks Xu Forward for bring up this topic, I think the JSON functions
> are
> > > very useful especially for those MySQL users.
> > >
> > > I saw that you have done some work within the Apache Calcite, that’s a
> > > good start, but this is one concern from me, Flink doesn’t support JSON
> > > type internal, so how to represent a JSON object in Flink maybe a key
> > point
> > > we need to resolve. In Calcite, we use ANY type to represent as the
> JSON,
> > > but I don’t think it is the right way to go, maybe we can have a
> > discussion
> > > here.
> > >
> > > Best,
> > > Danny Chan
> > > 在 2019年9月4日 +0800 PM8:34,Xu Forward ,写道:
> > > > Hi everybody,
> > > >
> > > > I'd like to kick off a discussion on Support JSON functions in Flink
> > SQL.
> > > >
> > > > The entire plan is divided into two steps:
> > > > 1. Implement Support SQL 2016-2017 JSON functions in Flink SQL[1].
> > > > 2. Implement non-Support SQL 2016-2017 JSON functions in Flink SQL,
> > such
> > > as
> > > > JSON_TYPE in Mysql, JSON_LENGTH, etc. Very useful JSON functions.
> > > >
> > > > Would love to hear your thoughts.
> > > >
> > > > [1]
> > > >
> > >
> >
> https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp
> > > >
> > > > Best,
> > > > ForwardXu
> > >
> >
>


Re: instable checkpointing after migration to flink 1.8

2019-09-04 Thread Congxian Qiu
Hi Bekir,

If it is the storage place for timers, for RocksDBStateBackend, timers can
be stored in Heap or RocksDB[1][2]
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options

Best,
Congxian


Bekir Oguz  于2019年9月4日周三 下午11:38写道:

> Hi Stephan,
> sorry for late response.
> We indeed use timers inside a KeyedProcessFunction but the triggers of the
> timers are kinda evenly distributed, so not causing a firing storm.
> We have a custom ttl logic which is used by the deletion timer to decide
> whether delete a record from inmemory state or not.
> Can you maybe give some links to those changes in the RocksDB?
>
> Thanks in advance,
> Bekir Oguz
>
> On Fri, 30 Aug 2019 at 14:56, Stephan Ewen  wrote:
>
>> Hi all!
>>
>> A thought would be that this has something to do with timers. Does the
>> task with that behavior use timers (windows, or process function)?
>>
>> If that is the case, some theories to check:
>>   - Could it be a timer firing storm coinciding with a checkpoint?
>> Currently, that storm synchronously fires, checkpoints cannot preempt that,
>> which should change in 1.10 with the new mailbox model.
>>   - Could the timer-async checkpointing changes have something to do with
>> that? Does some of the usually small "preparation work" (happening
>> synchronously) lead to an unwanted effect?
>>   - Are you using TTL for state in that operator?
>>   - There were some changes made to support timers in RocksDB recently.
>> Could that have something to do with it?
>>
>> Best,
>> Stephan
>>
>>
>> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu 
>> wrote:
>>
>>> CC flink dev mail list
>>> Update for those who may be interested in this issue, we'are still
>>> diagnosing this problem currently.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Congxian Qiu  于2019年8月29日周四 下午8:58写道:
>>>
>>> > Hi Bekir
>>> >
>>> > Currently, from what we have diagnosed, there is some task complete its
>>> > checkpoint too late (maybe 15 mins), but we checked the kafka broker
>>> log
>>> > and did not find any interesting things there. could we run another
>>> job,
>>> > that did not commit offset to kafka, this wants to check if it is the
>>> > "commit offset to kafka" step consumes too much time.
>>> >
>>> > Best,
>>> > Congxian
>>> >
>>> >
>>> > Bekir Oguz  于2019年8月28日周三 下午4:19写道:
>>> >
>>> >> Hi Congxian,
>>> >> sorry for the late reply, but no progress on this issue yet. I checked
>>> >> also the kafka broker logs, found nothing interesting there.
>>> >> And we still have 15 min duration checkpoints quite often. Any more
>>> ideas
>>> >> on where to look at?
>>> >>
>>> >> Regards,
>>> >> Bekir
>>> >>
>>> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
>>> >> wrote:
>>> >>
>>> >>> Hi Bekir
>>> >>>
>>> >>> Do you come back to work now, does there any more findings of this
>>> >>> problem?
>>> >>>
>>> >>> Best,
>>> >>> Congxian
>>> >>>
>>> >>>
>>> >>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
>>> >>>
>>>  Hi Congxian,
>>>  Thanks for following up this issue. It is still unresolved and I am
>>> on
>>>  vacation at the moment.  Hopefully my collegues Niels and Vlad can
>>> spare
>>>  some time to look into this.
>>> 
>>>  @Niels, @Vlad: do you guys also think that this issue might be Kafka
>>>  related? We could also check kafka broker logs at the time of long
>>>  checkpointing.
>>> 
>>>  Thanks,
>>>  Bekir
>>> 
>>>  Verstuurd vanaf mijn iPhone
>>> 
>>>  Op 12 aug. 2019 om 15:18 heeft Congxian Qiu >> >
>>>  het volgende geschreven:
>>> 
>>>  Hi Bekir
>>> 
>>>  Is there any progress about this problem?
>>> 
>>>  Best,
>>>  Congxian
>>> 
>>> 
>>>  Congxian Qiu  于2019年8月8日周四 下午11:17写道:
>>> 
>>> > hi Bekir
>>> > Thanks for the information.
>>> >
>>> > - Source's checkpoint was triggered by RPC calls, so it has the
>>> > "Trigger checkpoint xxx" log,
>>> > - other task's checkpoint was triggered after received all the
>>> barrier
>>> > of upstreams, it didn't log the "Trigger checkpoint XXX" :(
>>> >
>>> > Your diagnose makes sense to me, we need to check the Kafka log.
>>> > I also find out that we always have a log like
>>> > "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>>> Marking
>>> > the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null)
>>> dead
>>> > for group userprofileaggregator
>>> > 2019-08-06 13:58:49,872 DEBUG
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
>>> Notifica",
>>> >
>>> > I checked the doc of kafka[1], only find that the default of `
>>> > transaction.max.timeout.ms` is 15 min
>>> >
>>> > Please let me know there you have any finds. thanks
>>> >
>>> > PS: maybe you can also checkpoint the log for task
>>> > `

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-04 Thread Yijie Shen
Thanks for all the feedback and suggestions!

As Sijie said, the goal of the connector has always been to provide
users with the latest features of both systems as soon as possible. We
propose to contribute the connector to Flink and hope to get more
suggestions and feedback from Flink experts to ensure the high quality
of the connector.

For FLIP-27, we noticed its existence at the beginning of reworking
the connector implementation based on Flink 1.9; we also wanted to
build a connector that supports both batch and stream computing based
on it.
However, it has been inactive for some time, so we decided to provide
a connector with most of the new features, such as the new type system
and the new catalog API first. We will pay attention to the progress
of FLIP-27 continually and incorporate it with the connector as soon
as possible.

Regarding the test status of the connector, we are following the other
connectors' test in Flink repository and aimed to provide throughout
tests as we could. We are also happy to hear suggestions and
supervision from the Flink community to improve the stability and
performance of the connector continuously.

Best,
Yijie

On Thu, Sep 5, 2019 at 5:59 AM Sijie Guo  wrote:
>
> Thanks everyone for the comments and feedback.
>
> It seems to me that the main question here is about - "how can the Flink
> community maintain the connector?".
>
> Here are two thoughts from myself.
>
> 1) I think how and where to host this integration is kind of less important
> here. I believe there can be many ways to achieve it.
> As part of the contribution, what we are looking for here is how these two
> communities can build the collaboration relationship on developing
> the integration between Pulsar and Flink. Even we can try our best to catch
> up all the updates in Flink community. We are still
> facing the fact that we have less experiences in Flink than folks in Flink
> community. In order to make sure we maintain and deliver
> a high-quality pulsar-flink integration to the users who use both
> technologies, we need some help from the experts from Flink community.
>
> 2) We have been following FLIP-27 for a while. Originally we were thinking
> of contributing the connectors back after integrating with the
> new API introduced in FLIP-27. But we decided to initiate the conversation
> as early as possible. Because we believe there are more benefits doing
> it now rather than later. As part of contribution, it can help Flink
> community understand more about Pulsar and the potential integration points.
> Also we can also help Flink community verify the new connector API as well
> as other new API (e.g. catalog API).
>
> Thanks,
> Sijie
>
> On Wed, Sep 4, 2019 at 5:24 AM Becket Qin  wrote:
>
> > Hi Yijie,
> >
> > Thanks for the interest in contributing the Pulsar connector.
> >
> > In general, I think having Pulsar connector with strong support is a
> > valuable addition to Flink. So I am happy the shepherd this effort.
> > Meanwhile, I would also like to provide some context and recent efforts on
> > the Flink connectors ecosystem.
> >
> > The current way Flink maintains its connector has hit the scalability bar.
> > With more and more connectors coming into Flink repo, we are facing a few
> > problems such as long build and testing time. To address this problem, we
> > have attempted to do the following:
> > 1. Split out the connectors into a separate repository. This is temporarily
> > on hold due to potential solution to shorten the build time.
> > 2. Encourage the connectors to stay as ecosystem project while Flink tries
> > to provide good support for functionality and compatibility tests. Robert
> > has driven to create a Flink Ecosystem project website and it is going
> > through some final approval process.
> >
> > Given the above efforts, it would be great to first see if we can have
> > Pulsar connector as an ecosystem project with great support. It would be
> > good to hear how the Flink Pulsar connector is tested currently to see if
> > we can learn something to maintain it as an ecosystem project with good
> > quality and test coverage. If the quality as an ecosystem project is hard
> > to guarantee, we may as well adopt it into the main repo.
> >
> > BTW, another ongoing effort is FLIP-27 where we are making changes to the
> > Flink source connector architecture and interface. This change will likely
> > land in 1.10. Therefore timing wise, if we are going to have the Pulsar
> > connector in main repo, I am wondering if we should hold a little bit and
> > let the Pulsar connector adapt to the new interface to avoid shortly
> > deprecated work?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Sep 4, 2019 at 4:32 PM Chesnay Schepler 
> > wrote:
> >
> > > I'm quite worried that we may end up repeating history.
> > >
> > > There were already 2 attempts at contributing a pulsar connector, both
> > > of which failed because no committer was getting involved, despite the
> > 

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-04 Thread Sijie Guo
Thanks everyone for the comments and feedback.

It seems to me that the main question here is about - "how can the Flink
community maintain the connector?".

Here are two thoughts from myself.

1) I think how and where to host this integration is kind of less important
here. I believe there can be many ways to achieve it.
As part of the contribution, what we are looking for here is how these two
communities can build the collaboration relationship on developing
the integration between Pulsar and Flink. Even we can try our best to catch
up all the updates in Flink community. We are still
facing the fact that we have less experiences in Flink than folks in Flink
community. In order to make sure we maintain and deliver
a high-quality pulsar-flink integration to the users who use both
technologies, we need some help from the experts from Flink community.

2) We have been following FLIP-27 for a while. Originally we were thinking
of contributing the connectors back after integrating with the
new API introduced in FLIP-27. But we decided to initiate the conversation
as early as possible. Because we believe there are more benefits doing
it now rather than later. As part of contribution, it can help Flink
community understand more about Pulsar and the potential integration points.
Also we can also help Flink community verify the new connector API as well
as other new API (e.g. catalog API).

Thanks,
Sijie

On Wed, Sep 4, 2019 at 5:24 AM Becket Qin  wrote:

> Hi Yijie,
>
> Thanks for the interest in contributing the Pulsar connector.
>
> In general, I think having Pulsar connector with strong support is a
> valuable addition to Flink. So I am happy the shepherd this effort.
> Meanwhile, I would also like to provide some context and recent efforts on
> the Flink connectors ecosystem.
>
> The current way Flink maintains its connector has hit the scalability bar.
> With more and more connectors coming into Flink repo, we are facing a few
> problems such as long build and testing time. To address this problem, we
> have attempted to do the following:
> 1. Split out the connectors into a separate repository. This is temporarily
> on hold due to potential solution to shorten the build time.
> 2. Encourage the connectors to stay as ecosystem project while Flink tries
> to provide good support for functionality and compatibility tests. Robert
> has driven to create a Flink Ecosystem project website and it is going
> through some final approval process.
>
> Given the above efforts, it would be great to first see if we can have
> Pulsar connector as an ecosystem project with great support. It would be
> good to hear how the Flink Pulsar connector is tested currently to see if
> we can learn something to maintain it as an ecosystem project with good
> quality and test coverage. If the quality as an ecosystem project is hard
> to guarantee, we may as well adopt it into the main repo.
>
> BTW, another ongoing effort is FLIP-27 where we are making changes to the
> Flink source connector architecture and interface. This change will likely
> land in 1.10. Therefore timing wise, if we are going to have the Pulsar
> connector in main repo, I am wondering if we should hold a little bit and
> let the Pulsar connector adapt to the new interface to avoid shortly
> deprecated work?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Sep 4, 2019 at 4:32 PM Chesnay Schepler 
> wrote:
>
> > I'm quite worried that we may end up repeating history.
> >
> > There were already 2 attempts at contributing a pulsar connector, both
> > of which failed because no committer was getting involved, despite the
> > contributor opening a dedicated discussion thread about the contribution
> > beforehand and getting several +1's from committers.
> >
> > We should really make sure that if we welcome/approve such a
> > contribution it will actually get the attention it deserves.
> >
> > As such, I'm inclined to recommend maintaining the connector outside of
> > Flink. We could link to it from the documentation to give it more
> exposure.
> > With the upcoming page for sharing artifacts among the community (what's
> > the state of that anyway?), this may be a better option.
> >
> > On 04/09/2019 10:16, Till Rohrmann wrote:
> > > Hi everyone,
> > >
> > > thanks a lot for starting this discussion Yijie. I think the Pulsar
> > > connector would be a very valuable addition since Pulsar becomes more
> and
> > > more popular and it would further expand Flink's interoperability. Also
> > > from a project perspective it makes sense for me to place the connector
> > in
> > > the downstream project.
> > >
> > > My main concern/question is how can the Flink community maintain the
> > > connector? We have seen in the past that connectors are some of the
> most
> > > actively developed components because they need to be kept in sync with
> > the
> > > external system and with Flink. Given that the Pulsar community is
> > willing
> > > to help with maintaining, improving and e

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Xuefu Z
Thank all for the sharing thoughts. I think we have gathered some useful
initial feedback from this long discussion with a couple of focal points
sticking out.

 We will go back to do more research and adapt our proposal. Once it's
ready, we will ask for a new round of review. If there is any disagreement,
we will start a new discussion thread on each rather than having a mega
discussion like this.

Thanks to everyone for participating.

Regards,
Xuefu


On Thu, Sep 5, 2019 at 2:52 AM Bowen Li  wrote:

> Let me try to summarize and conclude the long thread so far:
>
> 1. For order of temp function v.s. built-in function:
>
> I think Dawid's point that temp function should be of fully qualified path
> is a better reasoning to back the newly proposed order, and i agree we
> don't need to follow Hive/Spark.
>
> However, I'd rather not change fundamentals of temporary functions in this
> FLIP. It belongs to a bigger story of how temporary objects should be
> redefined and be handled uniformly - currently temporary tables and views
> (those registered from TableEnv#registerTable()) behave different than what
> Dawid propose for temp functions, and we need a FLIP to just unify their
> APIs and behaviors.
>
> I agree that backward compatibility is not an issue w.r.t Jark's points.
>
> ***Seems we do have consensus that it's acceptable to prevent users
> registering a temp function in the same name as a built-in function. To
> help us move forward, I'd like to propose setting such a restraint on temp
> functions in this FLIP to simplify the design and avoid disputes.*** It
> will also leave rooms for improvements in the future.
>
>
> 2. For Hive built-in function:
>
> Thanks Timo for providing the Presto and Postgres examples. I feel modular
> built-in functions can be a good fit for the geo and ml example as a native
> Flink extension, but not sure if it fits well with external integrations.
> Anyway, I think modular built-in functions is a bigger story and can be on
> its own thread too, and our proposal doesn't prevent Flink from doing that
> in the future.
>
> ***Seems we have consensus that users should be able to use built-in
> functions of Hive or other external systems in SQL explicitly and
> deterministically regardless of Flink built-in functions and the potential
> modular built-in functions, via some new syntax like "mycat::func"? If so,
> I'd like to propose removing Hive built-in functions from ambiguous
> function resolution order, and empower users with such a syntax. This way
> we sacrifice a little convenience for certainty***
>
>
> What do you think?
>
> On Wed, Sep 4, 2019 at 7:02 AM Dawid Wysakowicz 
> wrote:
>
> > Hi,
> >
> > Regarding the Hive & Spark support of TEMPORARY FUNCTIONS. I've just
> > performed some experiments (hive-2.3.2 & spark 2.4.4) and I think they
> are
> > very inconsistent in that manner (spark being way worse on that).
> >
> > Hive:
> >
> > You cannot overwrite all the built-in functions. I could overwrite most
> of
> > the functions I tried e.g. length, e, pi, round, rtrim, but there are
> > functions I cannot overwrite e.g. CAST, ARRAY I get:
> >
> >
> > *ParseException line 1:29 cannot recognize input near 'array' 'AS' *
> >
> > What is interesting is that I cannot ovewrite *array*, but I can ovewrite
> > *map* or *struct*. Though hive behaves reasonable well if I manage to
> > overwrite a function. When I drop the temporary function the native
> > function is still available.
> >
> > Spark:
> >
> > Spark's behavior imho is super bad.
> >
> > Theoretically I could overwrite all functions. I was able e.g. to
> > overwrite CAST function. I had to use though CREATE OR REPLACE TEMPORARY
> > FUNCTION syntax. Otherwise I get an exception that a function already
> > exists. However when I used the CAST function in a query it used the
> > native, built-in one.
> >
> > When I overwrote current_date() function, it was used in a query, but it
> > completely replaces the built-in function and I can no longer use the
> > native function in any way. I cannot also drop the temporary function. I
> > get:
> >
> > *Error in query: Cannot drop native function 'current_date';*
> >
> > Additional note, both systems do not allow creating TEMPORARY FUNCTIONS
> > with a database. Temporary functions are always represented as a single
> > name.
> >
> > In my opinion neither of the systems have consistent behavior. Generally
> > speaking I think overwriting any system provided functions is just
> > dangerous.
> >
> > Regarding Jark's concerns. Such functions would be registered in a
> current
> > catalog/database schema, so a user could still use its own function, but
> > would have to fully qualify the function (because built-in functions take
> > precedence). Moreover users would have the same problem with permanent
> > functions. Imagine a user have a permanent function 'cat.db.explode'. In
> > 1.9 the user could use just the 'explode' function as long as the 'cat' &
> > 'db' were 

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Xuefu Z
Hi David,

Thanks for sharing the findings about temporary functions. Because of
strong inconsistency observed in Spark, we can probably ignore it for now.
For Hive, I understand one may not be able to overwrite everything, but the
capability is being offered.

Whether we offer this capability is to be determined, I don't see the
"danger" you mentioned. It's user's action, which only impacts the user's
current session. Other users are not impacted. (That's one of the benefits
of temporary objects.) We cannot always prevent users from making mistakes.
If we think overwriting is useful to the user, then we can allow it. To
make it more consistent, we can also further restrict by further
blacklisting built-in functions that may not be overwritten.

In my past experience, I did see user needs to overwriting a built-in
function in Hive. Without this capability, user has to create a permanent
function and modify all the queries referencing the function with it
fully-qualified name. This is equivalent to create a new, user-defined,
function. This way can work, but the usability is bad.

(Jark's "explode" example is actually important because forcing user to
modify query is of bad experience after an upgrade.)

In short, we can theoretically disallow function overwriting since there is
no standard. However, I don't see strong reasons for doing so, especially
such a capability is useful to some users.

Thanks,
Xuefu







On Wed, Sep 4, 2019 at 10:02 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> Regarding the Hive & Spark support of TEMPORARY FUNCTIONS. I've just
> performed some experiments (hive-2.3.2 & spark 2.4.4) and I think they are
> very inconsistent in that manner (spark being way worse on that).
>
> Hive:
>
> You cannot overwrite all the built-in functions. I could overwrite most of
> the functions I tried e.g. length, e, pi, round, rtrim, but there are
> functions I cannot overwrite e.g. CAST, ARRAY I get:
>
>
> *ParseException line 1:29 cannot recognize input near 'array' 'AS' *
>
> What is interesting is that I cannot ovewrite *array*, but I can ovewrite
> *map* or *struct*. Though hive behaves reasonable well if I manage to
> overwrite a function. When I drop the temporary function the native
> function is still available.
>
> Spark:
>
> Spark's behavior imho is super bad.
>
> Theoretically I could overwrite all functions. I was able e.g. to
> overwrite CAST function. I had to use though CREATE OR REPLACE TEMPORARY
> FUNCTION syntax. Otherwise I get an exception that a function already
> exists. However when I used the CAST function in a query it used the
> native, built-in one.
>
> When I overwrote current_date() function, it was used in a query, but it
> completely replaces the built-in function and I can no longer use the
> native function in any way. I cannot also drop the temporary function. I
> get:
>
> *Error in query: Cannot drop native function 'current_date';*
>
> Additional note, both systems do not allow creating TEMPORARY FUNCTIONS
> with a database. Temporary functions are always represented as a single
> name.
>
> In my opinion neither of the systems have consistent behavior. Generally
> speaking I think overwriting any system provided functions is just
> dangerous.
>
> Regarding Jark's concerns. Such functions would be registered in a current
> catalog/database schema, so a user could still use its own function, but
> would have to fully qualify the function (because built-in functions take
> precedence). Moreover users would have the same problem with permanent
> functions. Imagine a user have a permanent function 'cat.db.explode'. In
> 1.9 the user could use just the 'explode' function as long as the 'cat' &
> 'db' were the default catalog & database. If we introduce 'explode'
> built-in function in 1.10, the user has to fully qualify the function.
>
> Best,
>
> Dawid
> On 04/09/2019 15:19, Timo Walther wrote:
>
> Hi all,
>
> thanks for the healthy discussion. It is already a very long discussion
> with a lot of text. So I will just post my opinion to a couple of
> statements:
>
> > Hive built-in functions are not part of Flink built-in functions, they
> are catalog functions
>
> That is not entirely true. Correct me if I'm wrong but I think Hive
> built-in functions are also not catalog functions. They are not stored in
> every Hive metastore catalog that is freshly created but are a set of
> functions that are listed somewhere and made available.
>
> > ambiguous functions reference just shouldn't be resolved to a different
> catalog
>
> I agree. They should not be resolved to a different catalog. That's why I
> am suggesting to split the concept of built-in functions and catalog lookup
> semantics.
>
> > I don't know if any other databases handle built-in functions like that
>
> What I called "module" is:
> - Extension in Postgres [1]
> - Plugin in Presto [2]
>
> Btw. Presto even mentions example modules that are similar to the ones
> that we will introduce in the near future bot

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Bowen Li
Let me try to summarize and conclude the long thread so far:

1. For order of temp function v.s. built-in function:

I think Dawid's point that temp function should be of fully qualified path
is a better reasoning to back the newly proposed order, and i agree we
don't need to follow Hive/Spark.

However, I'd rather not change fundamentals of temporary functions in this
FLIP. It belongs to a bigger story of how temporary objects should be
redefined and be handled uniformly - currently temporary tables and views
(those registered from TableEnv#registerTable()) behave different than what
Dawid propose for temp functions, and we need a FLIP to just unify their
APIs and behaviors.

I agree that backward compatibility is not an issue w.r.t Jark's points.

***Seems we do have consensus that it's acceptable to prevent users
registering a temp function in the same name as a built-in function. To
help us move forward, I'd like to propose setting such a restraint on temp
functions in this FLIP to simplify the design and avoid disputes.*** It
will also leave rooms for improvements in the future.


2. For Hive built-in function:

Thanks Timo for providing the Presto and Postgres examples. I feel modular
built-in functions can be a good fit for the geo and ml example as a native
Flink extension, but not sure if it fits well with external integrations.
Anyway, I think modular built-in functions is a bigger story and can be on
its own thread too, and our proposal doesn't prevent Flink from doing that
in the future.

***Seems we have consensus that users should be able to use built-in
functions of Hive or other external systems in SQL explicitly and
deterministically regardless of Flink built-in functions and the potential
modular built-in functions, via some new syntax like "mycat::func"? If so,
I'd like to propose removing Hive built-in functions from ambiguous
function resolution order, and empower users with such a syntax. This way
we sacrifice a little convenience for certainty***


What do you think?

On Wed, Sep 4, 2019 at 7:02 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> Regarding the Hive & Spark support of TEMPORARY FUNCTIONS. I've just
> performed some experiments (hive-2.3.2 & spark 2.4.4) and I think they are
> very inconsistent in that manner (spark being way worse on that).
>
> Hive:
>
> You cannot overwrite all the built-in functions. I could overwrite most of
> the functions I tried e.g. length, e, pi, round, rtrim, but there are
> functions I cannot overwrite e.g. CAST, ARRAY I get:
>
>
> *ParseException line 1:29 cannot recognize input near 'array' 'AS' *
>
> What is interesting is that I cannot ovewrite *array*, but I can ovewrite
> *map* or *struct*. Though hive behaves reasonable well if I manage to
> overwrite a function. When I drop the temporary function the native
> function is still available.
>
> Spark:
>
> Spark's behavior imho is super bad.
>
> Theoretically I could overwrite all functions. I was able e.g. to
> overwrite CAST function. I had to use though CREATE OR REPLACE TEMPORARY
> FUNCTION syntax. Otherwise I get an exception that a function already
> exists. However when I used the CAST function in a query it used the
> native, built-in one.
>
> When I overwrote current_date() function, it was used in a query, but it
> completely replaces the built-in function and I can no longer use the
> native function in any way. I cannot also drop the temporary function. I
> get:
>
> *Error in query: Cannot drop native function 'current_date';*
>
> Additional note, both systems do not allow creating TEMPORARY FUNCTIONS
> with a database. Temporary functions are always represented as a single
> name.
>
> In my opinion neither of the systems have consistent behavior. Generally
> speaking I think overwriting any system provided functions is just
> dangerous.
>
> Regarding Jark's concerns. Such functions would be registered in a current
> catalog/database schema, so a user could still use its own function, but
> would have to fully qualify the function (because built-in functions take
> precedence). Moreover users would have the same problem with permanent
> functions. Imagine a user have a permanent function 'cat.db.explode'. In
> 1.9 the user could use just the 'explode' function as long as the 'cat' &
> 'db' were the default catalog & database. If we introduce 'explode'
> built-in function in 1.10, the user has to fully qualify the function.
>
> Best,
>
> Dawid
> On 04/09/2019 15:19, Timo Walther wrote:
>
> Hi all,
>
> thanks for the healthy discussion. It is already a very long discussion
> with a lot of text. So I will just post my opinion to a couple of
> statements:
>
> > Hive built-in functions are not part of Flink built-in functions, they
> are catalog functions
>
> That is not entirely true. Correct me if I'm wrong but I think Hive
> built-in functions are also not catalog functions. They are not stored in
> every Hive metastore catalog that is freshly created but are a set of
> functions th

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Stephan Ewen
Let's not block on config key names, just go ahead and we figure this out
concurrently or on the PR later.


On Wed, Sep 4, 2019 at 3:48 PM Stephan Ewen  wrote:

> Maybe to clear up confusion about my suggestion:
>
> I would vote to keep the name of the config parameter
> "taskmanager.memory.network" because it is the same key as currently (good
> to not break things unless good reason) and there currently is no case or
> even a concrete follow-up where we would actually differentiate between
> different types of network memory.
>
> I would suggest to not prematurely rename this because of something that
> might happen in the future. Experience shows that its better to do these
> things when the actual change comes.
>
> Side note: I am not sure "shuffle" is a good term in this context. I have
> so far only heard that in batch contexts, which is not what we do here. One
> more reason for me to not pre-maturely change names.
>
> On Wed, Sep 4, 2019 at 10:56 AM Xintong Song 
> wrote:
>
>> @till
>>
>> > Just to clarify Xintong, you suggest that Task off-heap memory
>> represents
>> > direct and native memory. Since we don't know how the user will allocate
>> > the memory we will add this value to -XX:MaxDirectMemorySize so that the
>> > process won't fail if the user allocates only direct memory and no
>> native
>> > memory. Is that correct?
>> >
>> Yes, this is what I mean.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Sep 4, 2019 at 4:25 PM Till Rohrmann 
>> wrote:
>>
>> > Just to clarify Xintong, you suggest that Task off-heap memory
>> represents
>> > direct and native memory. Since we don't know how the user will allocate
>> > the memory we will add this value to -XX:MaxDirectMemorySize so that the
>> > process won't fail if the user allocates only direct memory and no
>> native
>> > memory. Is that correct?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Sep 4, 2019 at 10:18 AM Xintong Song 
>> > wrote:
>> >
>> > > @Stephan
>> > > Not sure what do you mean by "just having this value". Are you
>> suggesting
>> > > that having "taskmanager.memory.network" refers to "shuffle" and
>> "other
>> > > network memory", or only "shuffle"?
>> > >
>> > > I guess what you mean is only "shuffle"? Because currently
>> > > "taskmanager.network.memory" refers to shuffle buffers only, which is
>> > "one
>> > > less config value to break".
>> > >
>> > > Thank you~
>> > >
>> > > Xintong Song
>> > >
>> > >
>> > >
>> > > On Wed, Sep 4, 2019 at 3:42 PM Stephan Ewen  wrote:
>> > >
>> > > > If we later split the network memory into "shuffle" and "other
>> network
>> > > > memory", I think it would make sense to split the option then.
>> > > >
>> > > > In that case "taskmanager.memory.network" would probably refer to
>> the
>> > > total
>> > > > network memory, which would most likely be what most users actually
>> > > > configure.
>> > > > My feeling is that for now just having this value is actually
>> easier,
>> > and
>> > > > it is one less config value to break (which is also good).
>> > > >
>> > > > On Wed, Sep 4, 2019 at 9:05 AM Xintong Song 
>> > > wrote:
>> > > >
>> > > > > Thanks for the voting and comments.
>> > > > >
>> > > > > @Stephan
>> > > > > - The '-XX:MaxDirectMemorySize' value should not include JVM
>> > Overhead.
>> > > > > Thanks for correction.
>> > > > > - 'taskmanager.memory.framework.heap' it heap memory reserved for
>> > task
>> > > > > executor framework, which can not be allocated to task slots. I
>> think
>> > > > users
>> > > > > should be able to configure both how many total java heap memory a
>> > task
>> > > > > executor should have and how many of the total java heap memory
>> can
>> > be
>> > > > > allocated to task slots.
>> > > > > - Regarding network / shuffle memory, I'm with @Andrey. ATM, this
>> > > memory
>> > > > > pool (derived from
>> "taskmanager.network.memory.[min/max/fraction]")
>> > is
>> > > > only
>> > > > > used inside NettyShuffleEnvironment as network buffers. There
>> might
>> > be
>> > > > > other network memory usage outside the shuffle component (as
>> > @Zhijiang
>> > > > also
>> > > > > suggested), but that is not accounted by this memory pool. This is
>> > > > exactly
>> > > > > why I would suggest to change the name from 'network' to
>> 'shuffle'.
>> > > > > - I agree that we need very good documentation to explain the
>> memory
>> > > > pools
>> > > > > and config options, as well as WebUI to present the memory pool
>> > sizes.
>> > > I
>> > > > > would suggest to address these as follow-ups of all the three
>> > resource
>> > > > > management FLIPs, for better integration.
>> > > > >
>> > > > > @Andrey
>> > > > > - Agree with the 'shuffle' naming. See above.
>> > > > >
>> > > > > @Till
>> > > > > - My understanding is that Task Off-heap memory accounts for both
>> > > direct
>> > > > > and native memory used by the user code. I'm not sure whether we
>> need
>> > > > > another configure option to split it. Given that we only decided
>> (in
>> > > the
>> 

Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread zhijiang
+1 
--
From:Till Rohrmann 
Send Time:2019年9月4日(星期三) 13:39
To:dev 
Cc:Zhu Zhu 
Subject:Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy 
configuration

+1 (binding)

On Wed, Sep 4, 2019 at 12:39 PM Chesnay Schepler  wrote:

> +1 (binding)
>
> On 04/09/2019 11:13, Zhu Zhu wrote:
> > +1 (non-binding)
> >
> > Thanks,
> > Zhu Zhu
> >
> > Till Rohrmann  于2019年9月4日周三 下午5:05写道:
> >
> >> Hi everyone,
> >>
> >> I would like to start the voting process for FLIP-61 [1], which is
> >> discussed and reached consensus in this thread [2].
> >>
> >> Since the change is rather small I'd like to shorten the voting period
> to
> >> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
> unless
> >> there is an objection or not enough votes.
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
> >> [2]
> >>
> >>
> https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E
> >>
> >> Cheers,
> >> Till
> >>
>
>



Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread zhijiang
+1

Best,
Zhijiang
--
From:Jark Wu 
Send Time:2019年9月4日(星期三) 13:45
To:dev 
Subject:Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and 
FailureRateRestartStrategy to 1s

+1

Best,
Jark

> 在 2019年9月4日,19:43,David Morávek  写道:
> 
> +1
> 
> On Wed, Sep 4, 2019 at 1:38 PM Till Rohrmann  wrote:
> 
>> +1 (binding)
>> 
>> On Wed, Sep 4, 2019 at 12:43 PM Chesnay Schepler 
>> wrote:
>> 
>>> +1 (binding)
>>> 
>>> On 04/09/2019 11:18, JingsongLee wrote:
 +1 (non-binding)
 
 default 0 is really not user production friendly.
 
 Best,
 Jingsong Lee
 
 
 --
 From:Zhu Zhu 
 Send Time:2019年9月4日(星期三) 17:13
 To:dev 
 Subject:Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay-
>>> and FailureRateRestartStrategy to 1s
 
 +1 (non-binding)
 
 Thanks,
 Zhu Zhu
 
 Till Rohrmann  于2019年9月4日周三 下午5:06写道:
 
> Hi everyone,
> 
> I would like to start the voting process for FLIP-62 [1], which
> is discussed and reached consensus in this thread [2].
> 
> Since the change is rather small I'd like to shorten the voting period
>>> to
> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
>>> unless
> there is an objection or not enough votes.
> 
> [1]
> 
> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> [2]
> 
> 
>>> 
>> https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E
> 
> Cheers,
> Till
> 
>>> 
>>> 
>> 



Re: instable checkpointing after migration to flink 1.8

2019-09-04 Thread Bekir Oguz
Hi Stephan,
sorry for late response.
We indeed use timers inside a KeyedProcessFunction but the triggers of the
timers are kinda evenly distributed, so not causing a firing storm.
We have a custom ttl logic which is used by the deletion timer to decide
whether delete a record from inmemory state or not.
Can you maybe give some links to those changes in the RocksDB?

Thanks in advance,
Bekir Oguz

On Fri, 30 Aug 2019 at 14:56, Stephan Ewen  wrote:

> Hi all!
>
> A thought would be that this has something to do with timers. Does the
> task with that behavior use timers (windows, or process function)?
>
> If that is the case, some theories to check:
>   - Could it be a timer firing storm coinciding with a checkpoint?
> Currently, that storm synchronously fires, checkpoints cannot preempt that,
> which should change in 1.10 with the new mailbox model.
>   - Could the timer-async checkpointing changes have something to do with
> that? Does some of the usually small "preparation work" (happening
> synchronously) lead to an unwanted effect?
>   - Are you using TTL for state in that operator?
>   - There were some changes made to support timers in RocksDB recently.
> Could that have something to do with it?
>
> Best,
> Stephan
>
>
> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu 
> wrote:
>
>> CC flink dev mail list
>> Update for those who may be interested in this issue, we'are still
>> diagnosing this problem currently.
>>
>> Best,
>> Congxian
>>
>>
>> Congxian Qiu  于2019年8月29日周四 下午8:58写道:
>>
>> > Hi Bekir
>> >
>> > Currently, from what we have diagnosed, there is some task complete its
>> > checkpoint too late (maybe 15 mins), but we checked the kafka broker log
>> > and did not find any interesting things there. could we run another job,
>> > that did not commit offset to kafka, this wants to check if it is the
>> > "commit offset to kafka" step consumes too much time.
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > Bekir Oguz  于2019年8月28日周三 下午4:19写道:
>> >
>> >> Hi Congxian,
>> >> sorry for the late reply, but no progress on this issue yet. I checked
>> >> also the kafka broker logs, found nothing interesting there.
>> >> And we still have 15 min duration checkpoints quite often. Any more
>> ideas
>> >> on where to look at?
>> >>
>> >> Regards,
>> >> Bekir
>> >>
>> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
>> >> wrote:
>> >>
>> >>> Hi Bekir
>> >>>
>> >>> Do you come back to work now, does there any more findings of this
>> >>> problem?
>> >>>
>> >>> Best,
>> >>> Congxian
>> >>>
>> >>>
>> >>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
>> >>>
>>  Hi Congxian,
>>  Thanks for following up this issue. It is still unresolved and I am
>> on
>>  vacation at the moment.  Hopefully my collegues Niels and Vlad can
>> spare
>>  some time to look into this.
>> 
>>  @Niels, @Vlad: do you guys also think that this issue might be Kafka
>>  related? We could also check kafka broker logs at the time of long
>>  checkpointing.
>> 
>>  Thanks,
>>  Bekir
>> 
>>  Verstuurd vanaf mijn iPhone
>> 
>>  Op 12 aug. 2019 om 15:18 heeft Congxian Qiu 
>>  het volgende geschreven:
>> 
>>  Hi Bekir
>> 
>>  Is there any progress about this problem?
>> 
>>  Best,
>>  Congxian
>> 
>> 
>>  Congxian Qiu  于2019年8月8日周四 下午11:17写道:
>> 
>> > hi Bekir
>> > Thanks for the information.
>> >
>> > - Source's checkpoint was triggered by RPC calls, so it has the
>> > "Trigger checkpoint xxx" log,
>> > - other task's checkpoint was triggered after received all the
>> barrier
>> > of upstreams, it didn't log the "Trigger checkpoint XXX" :(
>> >
>> > Your diagnose makes sense to me, we need to check the Kafka log.
>> > I also find out that we always have a log like
>> > "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>> Marking
>> > the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) dead
>> > for group userprofileaggregator
>> > 2019-08-06 13:58:49,872 DEBUG
>> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
>> Notifica",
>> >
>> > I checked the doc of kafka[1], only find that the default of `
>> > transaction.max.timeout.ms` is 15 min
>> >
>> > Please let me know there you have any finds. thanks
>> >
>> > PS: maybe you can also checkpoint the log for task
>> > `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message
>> late also.
>> >
>> > [1] https://kafka.apache.org/documentation/
>> > Best,
>> > Congxian
>> >
>> >
>> > Bekir Oguz  于2019年8月7日周三 下午6:48写道:
>> >
>> >> Hi Congxian,
>> >> Thanks for checking the logs. What I see from the logs is:
>> >>
>> >> - For the tasks like "Source:
>> >> profileservice-snowplow-clean-events_kafka_source -> Filter” {17,
>> 27, 31,
>> >> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also
>> ‘Confirm
>> >> checkpoint’

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-09-04 Thread Timo Walther
Thanks for your feedback Rong. You are right, we can still have shorter 
names if the user feedback demands that. Adding additional shorter 
method names is always possible. So let's stick to lit() for now.


I converted the Google document into a wiki page:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-55%3A+Introduction+of+a+Table+API+Java+Expression+DSL

I would start a voting thread by tomorrow. If there are no objections.

Thanks,
Timo


On 04.09.19 02:52, Rong Rong wrote:

Thanks for putting together the proposal @Timo and sorry for joining the
discussion thread late.

I also share the same thought with Fabian on the ease-of-use front. However
I was wondering if we need to start the expression design with them?
One thing I can think of is: is it possible to support "alias" later on in
the Expression once we collect enough feedback from the users?

IMO, It is always easier to expand the APIs later than reducing them.

Cheers,
Rong

On Mon, Sep 2, 2019 at 2:37 AM Timo Walther  wrote:


Hi all,

I see a majority votes for `lit(12)` so let's adopt that in the FLIP.
The `$("field")` would consider Fabian's concerns so I would vote for
keeping it like that.

One more question for native English speakers, is it acceptable to have
`isEqual` instead of `isEqualTo` and `isGreater` instead of
`isGreaterThan`?

If there are no more concerns, I will start a voting thread soon.

Thanks,
Timo


On 29.08.19 12:24, Fabian Hueske wrote:

Hi,

IMO, we should define what we would like to optimize for:
1) easy-to-get-started experience or
2) productivity and ease-of-use

While 1) is certainly important, I think we should put more emphasis on
goal 2).
That's why I favor as short as possible names for commonly used methods
like column references and literals/values.
These are used *many* times in *every* query.
Every user who uses the API for more than 30 mins will know what $() or

v()

(or whatever method names we come up with) are used for and everybody who
doesn't know can have a look at the JavaDocs or regular documentation.
Shorter method names are not only about increasing the speed to write a
query, but also reducing clutter that needs to be parsed to understand an
expression / query.

I'm OK with descriptive names for other expressions like call(),
isEqualTo() (although these could be the commonly used eq(), gte(),

etc.),

and so on but column references (and literals) should be as lightweight

as

possible, IMO.

Cheers,
Fabian

Am Do., 29. Aug. 2019 um 12:15 Uhr schrieb Timo Walther <

twal...@apache.org

:
I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()`
but I think Fabian and Dawid liked single char methods for the most
commonly used expressions.

Btw, what is your opinion on the names of commonly used methods such as
`isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
In theory we could make them shorter like `equals(), greaterOrEqual()`
or even shorter to `eq`, `gt`, `gte`?

Thanks,
Timo


On 29.08.19 11:51, Aljoscha Krettek wrote:

Overall, this is a very nice development that should also simplify the

code base once we deprecate the expression parser!

Regarding method names, I agree with Seth that values/literals should

use something like “lit()”. I also think that for column references we
could use “col()” to make it clear that it is a column reference. What

do

you think?

Aljoscha


On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:

I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala.

Assuming the intention is to make the dsl ergonomic for Scala

developers.

Seth


On Aug 28, 2019, at 7:58 AM, Timo Walther 

wrote:

Hi David,

thanks for your feedback. I was also skeptical about 1 char method

names, I restored the `val()` method for now. If you read literature

such

as Wikipedia [1]: "literal is a notation for representing a fixed value

in

source code. Almost all programming languages have notations for atomic
values". So they are also talking about "values".

Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced

that this is better.

Regards,
Timo

[1] https://en.wikipedia.org/wiki/Literal_(computer_programming)


On 27.08.19 22:10, David Anderson wrote:
TImo,

While it's not exactly pretty, I don't mind the $("field")

construct.

It's not particularly surprising. The v() method troubles me more;

it

looks mysterious. I think we would do better to have something more
explicit. val() isn't much better -- val("foo") could be interpreted
to mean the value of the "foo" column, or a literal string.

David


On Tue, Aug 27, 2019 at 5:45 PM Timo Walther 

wrote:

Hi David,

thanks for your feedback. With the current design, the DSL would be

free

of any ambiguity but it is definitely more verbose esp. around

defining

values.

I would be happy about further suggestions that make the DSL more
readable. I'm also not sure if we go for `$()` and `v()` instead of

more

readable `ref()` and `val()`. This could maybe make 

[jira] [Created] (FLINK-13964) Remove usage of deprecated methods from MiniCluster

2019-09-04 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-13964:
-

 Summary: Remove usage of deprecated methods from MiniCluster
 Key: FLINK-13964
 URL: https://issues.apache.org/jira/browse/FLINK-13964
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.10.0


With FLINK-13750 we deprecated 
{{HighAvailabilityServices#getWebMonitorRetrieverService}}. This method is 
still actively used by the {{MiniCluster}}. We should remove the usage in order 
to also support custom {{HighAvailabilityService}} implementations which no 
longer implement {{HighAvailabilityServices#getWebMonitorRetrieverService}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13963) Consolidate Hadoop file systems usage and Hadoop integration docs

2019-09-04 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-13963:
---

 Summary: Consolidate Hadoop file systems usage and Hadoop 
integration docs
 Key: FLINK-13963
 URL: https://issues.apache.org/jira/browse/FLINK-13963
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem, Connectors / Hadoop 
Compatibility, Documentation, FileSystems
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin
 Fix For: 1.10.0


We have hadoop related docs in several places at the moment:
 * *dev/batch/connectors.md* (Hadoop FS implementations and setup)
 * *dev/batch/hadoop_compatibility.md* (not valid any more that Flink always 
has Hadoop types out of the box as we do not build and provide Flink with 
Hadoop by default)
 * *ops/filesystems/index.md* (plugins, Hadoop FS implementations and setup 
revisited)
 * *ops/deployment/hadoop.md* (Hadoop classpath)
 * *ops/config.md* (deprecated way to provide Hadoop configuration in Flink 
conf)

We could consolidate all these pieces of docs into a consistent structure to 
help users to navigate through the docs to well-defined spots depending on 
which feature they are trying to use.

The places in docs which should contain the information about Hadoop:
 * *dev/batch/hadoop_compatibility.md* (only Dataset API specific stuff about 
integration with Hadoop)
 * *ops/filesystems/index.md* (Flink FS plugins and Hadoop FS implementations)
 * *ops/deployment/hadoop.md* (Hadoop configuration and classpath)

How to setup Hadoop itself should be only in *ops/deployment/hadoop.md*. All 
other places dealing with Hadoop/HDFS should contain only their related things 
and just reference it 'how to configure Hadoop'. Like all chapters about 
writing to file systems (batch connectors and streaming file sinks) should just 
reference file systems.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13962) Execution#taskRestore leaks if task fails before deploying

2019-09-04 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-13962:
---

 Summary: Execution#taskRestore leaks if task fails before deploying
 Key: FLINK-13962
 URL: https://issues.apache.org/jira/browse/FLINK-13962
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.9.0, 1.10.0
Reporter: Zhu Zhu


Currently Execution#taskRestore is reset to null in task deployment stage.

The purpose of it is "allows the JobManagerTaskRestore instance to be garbage 
collected. Furthermore, it won't be archived along with the Execution in the 
ExecutionVertex in case of a restart. This is especially important when setting 
state.backend.fs.memory-threshold to larger values because every state below 
this threshold will be stored in the meta state files and, thus, also the 
JobManagerTaskRestore instances." (From FLINK-9693)

 

However, if a task fails before it comes to the deployment stage, the 
Execution#taskRestore will remain non-null and will be archived in prior 
executions. 

This may result in large JM heap cost in certain cases.

 

I think we should check the Execution#taskRestore and make sure it is null when 
moving a execution to prior executions.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13961) Remove obsolete abstraction JobExecutor(Service)

2019-09-04 Thread TisonKun (Jira)
TisonKun created FLINK-13961:


 Summary: Remove obsolete abstraction JobExecutor(Service) 
 Key: FLINK-13961
 URL: https://issues.apache.org/jira/browse/FLINK-13961
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


Refer to Till's comment

The JobExecutor and the JobExecutorService have been introduced to bridge 
between the Flip-6 MiniCluster and the legacy FlinkMiniCluster. They should be 
obsolete now and could be removed if needed.

Actually we should make used of {{MiniClusterClient}} for submission ideally 
but we have some tests based on MiniCluster in flink-runtime or somewhere that 
doesn't have a dependency to flink-client; while move {{MiniClusterClient}} to 
flink-runtime is unclear whether reasonable or not. Thus I'd prefer keep 
{{executeJobBlocking}} for now and defer the possible refactor.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[DISCUSS] FLIP-64: Support for Temporary Objects in Table module

2019-09-04 Thread Dawid Wysakowicz
Hi all,

As part of FLIP-30
a
Catalog API was introduced that enables storing table meta objects
permanently. At the same time the majority of current APIs create
temporary objects that cannot be serialized. We should clarify the
creation of meta objects (tables, views, functions) in a unified way.

Another current problem in the API is that all the temporary objects are
stored in a special built-in catalog, which is not very intuitive for
many users, as they must be aware of that catalog to reference temporary
objects.

Lastly, different APIs have different ways of providing object paths:

  * String path…, 
  * String path, String pathContinued…
  * String name

We should choose one approach and unify it across all APIs.

I suggest a FLIP to address the above issues.

Looking forward to your opinions.

FLIP link:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (FLINK-13960) Provide default implementation for deprecated HighAvailabilityServices.getWebMonitorLeaderRetriever

2019-09-04 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-13960:
-

 Summary: Provide default implementation for deprecated 
HighAvailabilityServices.getWebMonitorLeaderRetriever
 Key: FLINK-13960
 URL: https://issues.apache.org/jira/browse/FLINK-13960
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.10.0


With FLINK-13750 we split the {{HighAvailabilityServices}} up into a client 
side {{ClientHighAvailabilitySerivces}} and cluster side 
{{HighAvailabilityServices}}. As part of this split, the method 
{{HighAvailabilityServices.getWebMonitorLeaderRetriever}} was deprecated. In 
order to not having to implement this method anymore, I suggest to provide a 
default implementation which throws an {{UnsupportedOperationException}} 
referring to the {{ClientHighAvailabilityServices}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Dawid Wysakowicz
Hi,

Regarding the Hive & Spark support of TEMPORARY FUNCTIONS. I've just
performed some experiments (hive-2.3.2 & spark 2.4.4) and I think they
are very inconsistent in that manner (spark being way worse on that).

Hive:

You cannot overwrite all the built-in functions. I could overwrite most
of the functions I tried e.g. length, e, pi, round, rtrim, but there are
functions I cannot overwrite e.g. CAST, ARRAY I get:

/    ParseException line 1:29 cannot recognize input near 'array' 'AS'
/

What is interesting is that I cannot ovewrite /array/, but I can
ovewrite /map/ or /struct/. Though hive behaves reasonable well if I
manage to overwrite a function. When I drop the temporary function the
native function is still available.

Spark:

Spark's behavior imho is super bad.

Theoretically I could overwrite all functions. I was able e.g. to
overwrite CAST function. I had to use though CREATE OR REPLACE TEMPORARY
FUNCTION syntax. Otherwise I get an exception that a function already
exists. However when I used the CAST function in a query it used the
native, built-in one.

When I overwrote current_date() function, it was used in a query, but it
completely replaces the built-in function and I can no longer use the
native function in any way. I cannot also drop the temporary function. I
get:

/    Error in query: Cannot drop native function 'current_date';/

Additional note, both systems do not allow creating TEMPORARY FUNCTIONS
with a database. Temporary functions are always represented as a single
name.

In my opinion neither of the systems have consistent behavior. Generally
speaking I think overwriting any system provided functions is just
dangerous.

Regarding Jark's concerns. Such functions would be registered in a
current catalog/database schema, so a user could still use its own
function, but would have to fully qualify the function (because built-in
functions take precedence). Moreover users would have the same problem
with permanent functions. Imagine a user have a permanent function
'cat.db.explode'. In 1.9 the user could use just the 'explode' function
as long as the 'cat' & 'db' were the default catalog & database. If we
introduce 'explode' built-in function in 1.10, the user has to fully
qualify the function.

Best,

Dawid

On 04/09/2019 15:19, Timo Walther wrote:
> Hi all,
>
> thanks for the healthy discussion. It is already a very long
> discussion with a lot of text. So I will just post my opinion to a
> couple of statements:
>
> > Hive built-in functions are not part of Flink built-in functions,
> they are catalog functions
>
> That is not entirely true. Correct me if I'm wrong but I think Hive
> built-in functions are also not catalog functions. They are not stored
> in every Hive metastore catalog that is freshly created but are a set
> of functions that are listed somewhere and made available.
>
> > ambiguous functions reference just shouldn't be resolved to a
> different catalog
>
> I agree. They should not be resolved to a different catalog. That's
> why I am suggesting to split the concept of built-in functions and
> catalog lookup semantics.
>
> > I don't know if any other databases handle built-in functions like that
>
> What I called "module" is:
> - Extension in Postgres [1]
> - Plugin in Presto [2]
>
> Btw. Presto even mentions example modules that are similar to the ones
> that we will introduce in the near future both for ML and System XYZ
> compatibility:
> "See either the presto-ml module for machine learning functions or the
> presto-teradata-functions module for Teradata-compatible functions,
> both in the root of the Presto source."
>
> > functions should be either built-in already or just libraries
> functions, and library functions can be adapted to catalog APIs or of
> some other syntax to use
>
> Regarding "built-in already", of course we can add a lot of functions
> as built-ins but we will end-up in a dependency hell in the near
> future if we don't introduce a pluggable approach. Library functions
> is what you also suggest but storing them in a catalog means to always
> fully qualify them or modifying the existing catalog design that was
> inspired by the standard.
>
> I don't think "it brings in even more complicated scenarios to the
> design", it just does clear separation of concerns. Integrating the
> functionality into the current design makes the catalog API more
> complicated.
>
> > why would users name a temporary function the same as a built-in
> function then?
>
> Because you never know what users do. If they don't, my suggested
> resolution order should not be a problem, right?
>
> > I don't think hive functions deserves be a function module
>
> Our goal is not to create a Hive clone. We need to think forward and
> Hive is just one of many systems that we can support. Not every
> built-in function behaves and will behave exactly like Hive.
>
> > regarding temporary functions, there are few systems that support it
>
> IMHO Spark and Hive are not always the best

Re: [DISCUSS] Support JSON functions in Flink SQL

2019-09-04 Thread Jark Wu
Hi Forward,

Thanks for bringing this discussion and preparing the nice design.
I think it's nice to have the JSON functions in the next release.
We have received some requirements for this feature.

I can help to shepherd this JSON functions effort and will leave comments
 in the design doc in the next days.

Hi Danny,

The new introduced JSON functions are from SQL:2016, not from MySQL.
So there no JSON type is needed. According to the SQL:2016, the
representation of JSON data can be "character string" which is also
the current implementation in Calcite[1].

Best,
Jark


[1]: https://calcite.apache.org/docs/reference.html#json-functions


On Wed, 4 Sep 2019 at 21:22, Xu Forward  wrote:

> hi Danny Chan ,Thank you very much for your reply, your help can help me
> further improve this discussion.
> Best
> forward
>
> Danny Chan  于2019年9月4日周三 下午8:50写道:
>
> > Thanks Xu Forward for bring up this topic, I think the JSON functions are
> > very useful especially for those MySQL users.
> >
> > I saw that you have done some work within the Apache Calcite, that’s a
> > good start, but this is one concern from me, Flink doesn’t support JSON
> > type internal, so how to represent a JSON object in Flink maybe a key
> point
> > we need to resolve. In Calcite, we use ANY type to represent as the JSON,
> > but I don’t think it is the right way to go, maybe we can have a
> discussion
> > here.
> >
> > Best,
> > Danny Chan
> > 在 2019年9月4日 +0800 PM8:34,Xu Forward ,写道:
> > > Hi everybody,
> > >
> > > I'd like to kick off a discussion on Support JSON functions in Flink
> SQL.
> > >
> > > The entire plan is divided into two steps:
> > > 1. Implement Support SQL 2016-2017 JSON functions in Flink SQL[1].
> > > 2. Implement non-Support SQL 2016-2017 JSON functions in Flink SQL,
> such
> > as
> > > JSON_TYPE in Mysql, JSON_LENGTH, etc. Very useful JSON functions.
> > >
> > > Would love to hear your thoughts.
> > >
> > > [1]
> > >
> >
> https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp
> > >
> > > Best,
> > > ForwardXu
> >
>


Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Stephan Ewen
Maybe to clear up confusion about my suggestion:

I would vote to keep the name of the config parameter
"taskmanager.memory.network" because it is the same key as currently (good
to not break things unless good reason) and there currently is no case or
even a concrete follow-up where we would actually differentiate between
different types of network memory.

I would suggest to not prematurely rename this because of something that
might happen in the future. Experience shows that its better to do these
things when the actual change comes.

Side note: I am not sure "shuffle" is a good term in this context. I have
so far only heard that in batch contexts, which is not what we do here. One
more reason for me to not pre-maturely change names.

On Wed, Sep 4, 2019 at 10:56 AM Xintong Song  wrote:

> @till
>
> > Just to clarify Xintong, you suggest that Task off-heap memory represents
> > direct and native memory. Since we don't know how the user will allocate
> > the memory we will add this value to -XX:MaxDirectMemorySize so that the
> > process won't fail if the user allocates only direct memory and no native
> > memory. Is that correct?
> >
> Yes, this is what I mean.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Sep 4, 2019 at 4:25 PM Till Rohrmann  wrote:
>
> > Just to clarify Xintong, you suggest that Task off-heap memory represents
> > direct and native memory. Since we don't know how the user will allocate
> > the memory we will add this value to -XX:MaxDirectMemorySize so that the
> > process won't fail if the user allocates only direct memory and no native
> > memory. Is that correct?
> >
> > Cheers,
> > Till
> >
> > On Wed, Sep 4, 2019 at 10:18 AM Xintong Song 
> > wrote:
> >
> > > @Stephan
> > > Not sure what do you mean by "just having this value". Are you
> suggesting
> > > that having "taskmanager.memory.network" refers to "shuffle" and "other
> > > network memory", or only "shuffle"?
> > >
> > > I guess what you mean is only "shuffle"? Because currently
> > > "taskmanager.network.memory" refers to shuffle buffers only, which is
> > "one
> > > less config value to break".
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Wed, Sep 4, 2019 at 3:42 PM Stephan Ewen  wrote:
> > >
> > > > If we later split the network memory into "shuffle" and "other
> network
> > > > memory", I think it would make sense to split the option then.
> > > >
> > > > In that case "taskmanager.memory.network" would probably refer to the
> > > total
> > > > network memory, which would most likely be what most users actually
> > > > configure.
> > > > My feeling is that for now just having this value is actually easier,
> > and
> > > > it is one less config value to break (which is also good).
> > > >
> > > > On Wed, Sep 4, 2019 at 9:05 AM Xintong Song 
> > > wrote:
> > > >
> > > > > Thanks for the voting and comments.
> > > > >
> > > > > @Stephan
> > > > > - The '-XX:MaxDirectMemorySize' value should not include JVM
> > Overhead.
> > > > > Thanks for correction.
> > > > > - 'taskmanager.memory.framework.heap' it heap memory reserved for
> > task
> > > > > executor framework, which can not be allocated to task slots. I
> think
> > > > users
> > > > > should be able to configure both how many total java heap memory a
> > task
> > > > > executor should have and how many of the total java heap memory can
> > be
> > > > > allocated to task slots.
> > > > > - Regarding network / shuffle memory, I'm with @Andrey. ATM, this
> > > memory
> > > > > pool (derived from "taskmanager.network.memory.[min/max/fraction]")
> > is
> > > > only
> > > > > used inside NettyShuffleEnvironment as network buffers. There might
> > be
> > > > > other network memory usage outside the shuffle component (as
> > @Zhijiang
> > > > also
> > > > > suggested), but that is not accounted by this memory pool. This is
> > > > exactly
> > > > > why I would suggest to change the name from 'network' to 'shuffle'.
> > > > > - I agree that we need very good documentation to explain the
> memory
> > > > pools
> > > > > and config options, as well as WebUI to present the memory pool
> > sizes.
> > > I
> > > > > would suggest to address these as follow-ups of all the three
> > resource
> > > > > management FLIPs, for better integration.
> > > > >
> > > > > @Andrey
> > > > > - Agree with the 'shuffle' naming. See above.
> > > > >
> > > > > @Till
> > > > > - My understanding is that Task Off-heap memory accounts for both
> > > direct
> > > > > and native memory used by the user code. I'm not sure whether we
> need
> > > > > another configure option to split it. Given that we only decided
> (in
> > > the
> > > > > discussion thread) to try it out the way we set
> > > '-XX:MaxDirectMemorySize'
> > > > > in current design and may switch to other alternatives if it
> doesn't
> > > work
> > > > > out well, I would suggest the same for this one. I suggest that we
> > > first
> > > > > try it without the splitting config option, and we can easily add
> 

Re: [DISCUSS] Support JSON functions in Flink SQL

2019-09-04 Thread Xu Forward
hi Danny Chan ,Thank you very much for your reply, your help can help me
further improve this discussion.
Best
forward

Danny Chan  于2019年9月4日周三 下午8:50写道:

> Thanks Xu Forward for bring up this topic, I think the JSON functions are
> very useful especially for those MySQL users.
>
> I saw that you have done some work within the Apache Calcite, that’s a
> good start, but this is one concern from me, Flink doesn’t support JSON
> type internal, so how to represent a JSON object in Flink maybe a key point
> we need to resolve. In Calcite, we use ANY type to represent as the JSON,
> but I don’t think it is the right way to go, maybe we can have a discussion
> here.
>
> Best,
> Danny Chan
> 在 2019年9月4日 +0800 PM8:34,Xu Forward ,写道:
> > Hi everybody,
> >
> > I'd like to kick off a discussion on Support JSON functions in Flink SQL.
> >
> > The entire plan is divided into two steps:
> > 1. Implement Support SQL 2016-2017 JSON functions in Flink SQL[1].
> > 2. Implement non-Support SQL 2016-2017 JSON functions in Flink SQL, such
> as
> > JSON_TYPE in Mysql, JSON_LENGTH, etc. Very useful JSON functions.
> >
> > Would love to hear your thoughts.
> >
> > [1]
> >
> https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp
> >
> > Best,
> > ForwardXu
>


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Timo Walther

Hi all,

thanks for the healthy discussion. It is already a very long discussion 
with a lot of text. So I will just post my opinion to a couple of 
statements:


> Hive built-in functions are not part of Flink built-in functions, 
they are catalog functions


That is not entirely true. Correct me if I'm wrong but I think Hive 
built-in functions are also not catalog functions. They are not stored 
in every Hive metastore catalog that is freshly created but are a set of 
functions that are listed somewhere and made available.


> ambiguous functions reference just shouldn't be resolved to a 
different catalog


I agree. They should not be resolved to a different catalog. That's why 
I am suggesting to split the concept of built-in functions and catalog 
lookup semantics.


> I don't know if any other databases handle built-in functions like that

What I called "module" is:
- Extension in Postgres [1]
- Plugin in Presto [2]

Btw. Presto even mentions example modules that are similar to the ones 
that we will introduce in the near future both for ML and System XYZ 
compatibility:
"See either the presto-ml module for machine learning functions or the 
presto-teradata-functions module for Teradata-compatible functions, both 
in the root of the Presto source."


> functions should be either built-in already or just libraries 
functions, and library functions can be adapted to catalog APIs or of 
some other syntax to use


Regarding "built-in already", of course we can add a lot of functions as 
built-ins but we will end-up in a dependency hell in the near future if 
we don't introduce a pluggable approach. Library functions is what you 
also suggest but storing them in a catalog means to always fully qualify 
them or modifying the existing catalog design that was inspired by the 
standard.


I don't think "it brings in even more complicated scenarios to the 
design", it just does clear separation of concerns. Integrating the 
functionality into the current design makes the catalog API more 
complicated.


> why would users name a temporary function the same as a built-in 
function then?


Because you never know what users do. If they don't, my suggested 
resolution order should not be a problem, right?


> I don't think hive functions deserves be a function module

Our goal is not to create a Hive clone. We need to think forward and 
Hive is just one of many systems that we can support. Not every built-in 
function behaves and will behave exactly like Hive.


> regarding temporary functions, there are few systems that support it

IMHO Spark and Hive are not always the best examples for consistent 
design. Systems like Postgres, Presto, or SQL Server should be used as a 
reference. I don't think that a user can overwrite a built-in function 
there.


Regards,
Timo

[1] https://www.postgresql.org/docs/10/extend-extensions.html
[2] https://prestodb.github.io/docs/current/develop/functions.html


On 04.09.19 13:44, Jark Wu wrote:

Hi all,

Regarding #1 temp function <> built-in function and naming.
I'm fine with temp functions should precede built-in function and can
override built-in functions (we already support to override built-in
function in 1.9).
If we don't allow the same name as a built-in function, I'm afraid we will
have compatibility issues in the future.
Say users register a user defined function named "explode" in 1.9, and we
support a built-in "explode" function in 1.10.
Then the user's jobs which call the registered "explode" function in 1.9
will all fail in 1.10 because of naming conflict.

Regarding #2 "External" built-in functions.
I think if we store external built-in functions in catalog, then
"hive1::sqrt" is a good way to go.
However, I would prefer to support a discovery mechanism (e.g. SPI) for
built-in functions as Timo suggested above.
This gives us the flexibility to add Hive or MySQL or Geo or whatever
function set as built-in functions in an easy way.

Best,
Jark

On Wed, 4 Sep 2019 at 17:47, Xuefu Z  wrote:


Hi David,

Thank you for sharing your findings. It seems to me that there is no SQL
standard regarding temporary functions. There are few systems that support
it. Here are what I have found:

1. Hive: no DB qualifier allowed. Can overwrite built-in.
2. Spark: basically follows Hive (

https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html
)
3. SAP SQL Anywhere Server: can have owner (db?). Not sure of overwriting
behavior. (
http://dcx.sap.com/sqla170/en/html/816bdf316ce210148d3acbebf6d39b18.html)

Because of lack of standard, it's perfectly fine for Flink to define
whatever it sees appropriate. Thus, your proposal (no overwriting and must
have DB as holder) is one option. The advantage is simplicity, The downside
is the deviation from Hive, which is popular and de facto standard in big
data world.

However, I don't think we have to follow Hive. More importantly, we need a
consensus. I have no objection if your proposal is generally agreed upon.


Storing offsets in Kafka

2019-09-04 Thread Dominik Wosiński
Hey,
I was wondering whether something has changed for KafkaConsumer, since I am
using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
seems to be no change in the topic where Kafka stores it's offsets, after
restart Flink uses the `auto.offset.reset` so it seems that there is no
offsets commit happening. The checkpoints are properly configured and I am
able to restore with Savepoint. But the group offsets are not working
properly. It there anything that has changed in this manner ?

Best Regards,
Dom.


Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-04 Thread Aljoscha Krettek
Hi,

Things looks interesting so far!

I had one question: Where will most of the support code for this live? Will 
this add the required code to flink-table-common or the different runners? Can 
we implement this in such a way that only a minimal amount of support code is 
required in the parts of the Table API (and Table API runners) that  are not 
python specific?

Best,
Aljoscha

> On 4. Sep 2019, at 14:14, Timo Walther  wrote:
> 
> Hi Jincheng,
> 
> 2. Serializability of functions: "#2 is very convenient for users" means only 
> until they have the first backwards-compatibility issue, after that they will 
> find it not so convinient anymore and will ask why the framework allowed 
> storing such objects in a persistent storage. I don't want to be picky about 
> it, but wanted to raise awareness that sometimes it is ok to limit use cases 
> to guide users for devloping backwards-compatible programs.
> 
> Thanks for the explanation fo the remaining items. It sounds reasonable to 
> me. Regarding the example with `getKind()`, I actually meant 
> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow 
> users to override this property. And I think we should do something similar 
> for the getLanguage property.
> 
> Thanks,
> Timo
> 
> On 03.09.19 15:01, jincheng sun wrote:
>> Hi Timo,
>> 
>> Thanks for the quick reply ! :)
>> I have added more example for #3 and #5 to the FLIP. That are great
>> suggestions !
>> 
>> Regarding 2:
>> 
>> There are two kind Serialization for CloudPickle(Which is different from
>> Java):
>>  1) For class and function which can be imported, CloudPickle only
>> serialize the full path of the class and function (just like java class
>> name).
>>  2) For the class and function which can not be imported, CloudPickle will
>> serialize the full content of the class and function.
>> For #2, It means that we can not just store the full path of the class and
>> function.
>> 
>> The above serialization is recursive.
>> 
>> However, there is indeed an problem of backwards compatibility when the
>> module path of the parent class changed. But I think this is an rare case
>> and acceptable. i.e., For Flink framework we never change the user
>> interface module path if we want to keep backwards compatibility. For user
>> code, if they change the interface of UDF's parent, they should re-register
>> their functions.
>> 
>> If we do not want support #2, we can store the full path of class and
>> function, in that case we have no backwards compatibility problem. But I
>> think the #2 is very convenient for users.
>> 
>> What do you think?
>> 
>> Regarding 4:
>> As I mentioned earlier, there may be built-in Python functions and I think
>> language is a "function" concept. Function and Language are orthogonal
>> concepts.
>> We may have R, GO and other language functions in the future, not only
>> user-defined, but also built-in functions.
>> 
>> You are right that users will not set this method and for Python functions,
>> it will be set in the code-generated Java function by the framework. So, I
>> think we should declare the getLanguage() in FunctionDefinition for now.
>> (I'm not pretty sure what do you mean by saying that getKind() is final in
>> UserDefinedFunction?)
>> 
>> Best,
>> Jincheng
>> 
>> Timo Walther  于2019年9月3日周二 下午6:01写道:
>> 
>>> Hi Jincheng,
>>> 
>>> thanks for your response.
>>> 
>>> 2. Serializability of functions: Using some arbitrary serialization
>>> format for shipping a function to worker sounds fine to me. But once we
>>> store functions a the catalog we need to think about backwards
>>> compatibility and evolution of interfaces etc. I'm not sure if
>>> CloudPickle is the right long-term storage format for this. If we don't
>>> think about this in advance, we are basically violating our code quality
>>> guide [1] of never use Java Serialization but in the Python-way. We are
>>> using the RPC serialization for persistence.
>>> 
>>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
>>> code like the following is not covered there:
>>> 
>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>>> DataTypes.BIGINT(),
>>>  DataTypes.BIGINT()))
>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>>> DataTypes.BIGINT(),
>>> DataTypes.BIGINT()))
>>> self.t_env.register_function("add", add)
>>> 
>>> 4. FunctionDefinition: Your response still doesn't answer my question
>>> entirely. Why do we need FunctionDefinition.getLanguage() if this is a
>>> "user-defined function" concept and not a "function" concept. In any
>>> case, all users should not be able to set this method. So it must be
>>> final in UserDefinedFunction similar to getKind().
>>> 
>>> 5. Function characteristics: If UserDefinedFunction is defined in
>>> Python, why is it not used in your example in FLIP-58. You could you
>>> extend the example to show how to specify these attributes in the FLIP?
>>> 
>>> Rega

Re: [DISCUSS] Support JSON functions in Flink SQL

2019-09-04 Thread Danny Chan
Thanks Xu Forward for bring up this topic, I think the JSON functions are very 
useful especially for those MySQL users.

I saw that you have done some work within the Apache Calcite, that’s a good 
start, but this is one concern from me, Flink doesn’t support JSON type 
internal, so how to represent a JSON object in Flink maybe a key point we need 
to resolve. In Calcite, we use ANY type to represent as the JSON, but I don’t 
think it is the right way to go, maybe we can have a discussion here.

Best,
Danny Chan
在 2019年9月4日 +0800 PM8:34,Xu Forward ,写道:
> Hi everybody,
>
> I'd like to kick off a discussion on Support JSON functions in Flink SQL.
>
> The entire plan is divided into two steps:
> 1. Implement Support SQL 2016-2017 JSON functions in Flink SQL[1].
> 2. Implement non-Support SQL 2016-2017 JSON functions in Flink SQL, such as
> JSON_TYPE in Mysql, JSON_LENGTH, etc. Very useful JSON functions.
>
> Would love to hear your thoughts.
>
> [1]
> https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp
>
> Best,
> ForwardXu


[DISCUSS] Support JSON functions in Flink SQL

2019-09-04 Thread Xu Forward
Hi everybody,

I'd like to kick off a discussion on Support JSON functions in Flink SQL.

The entire plan is divided into two steps:
1. Implement Support SQL 2016-2017 JSON functions in Flink SQL[1].
2. Implement non-Support SQL 2016-2017 JSON functions in Flink SQL, such as
JSON_TYPE in Mysql, JSON_LENGTH, etc. Very useful JSON functions.

Would love to hear your thoughts.

[1]
https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp

Best,
ForwardXu


Re: FLIP-63: Rework table partition support

2019-09-04 Thread Timo Walther

Hi Jingsong,

thanks for your proposal. Could you repost this email with the subject:

"[DISCUSS] FLIP-63: Rework table partition support"

Some people have filters for [DISCUSS] threads and it also makes 
important emails more prominent visually.


Thanks,
Timo

On 04.09.19 09:11, JingsongLee wrote:

Hi everyone,

We would like to start a discussion thread on "FLIP-63: Rework table
partition support"(Design doc: [1]), where we describe how to partition
  support in flink and how to integrate to hive partition.

This FLIP addresses:
- Introduce whole story about partition support.
- Introduce and discuss DDL of partition support.
- Introduce static and dynamic partition insert.
- Introduce partition pruning
- Introduce dynamic partition implementation

Details can be seen in the design document.
Looking forward to your feedbacks. Thank you.

[1] 
https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing

Best,
Jingsong Lee





Re: [DISCUSS] FLIP-53: Fine Grained Resource Management

2019-09-04 Thread Andrey Zagrebin
Thanks for updating the FLIP Xintong. It looks good to me. I would be ok to
start a vote for it.

Best,
Andrey

On Wed, Sep 4, 2019 at 10:03 AM Xintong Song  wrote:

> @all
>
> The FLIP document [1] has been updated.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
>
> On Tue, Sep 3, 2019 at 7:20 PM Zhu Zhu  wrote:
>
> > Thanks Xintong for the explanation.
> >
> > For question #1, I think it's good as long as DataSet job behaviors
> remains
> > the same.
> >
> > For question #2, agreed that the resource difference is small enough(at
> > most 1 edge diff) in current supported point-wise execution edge
> connection
> > patterns.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Xintong Song  于2019年9月3日周二 下午6:58写道:
> >
> > >  Thanks for the comments, Zhu & Kurt.
> > >
> > > Andrey and I also had some discussions offline, and I would like to
> first
> > > post a summary of our discussion:
> > >
> > >1. The motivation of the fraction based approach is to unify
> resource
> > >management for both operators with specified and unknown resource
> > >requirements.
> > >2. The fraction based approach proposed in this FLIP should only
> > affect
> > >streaming jobs (both bounded and unbounded). For DataSet jobs, there
> > are
> > >already some fraction based approach (in TaskConfig and
> > ChainedDriver),
> > > and
> > >we do not make any change to the existing approach.
> > >3. The scope of this FLIP does not include discussion of how to set
> > >ResourceSpec for operators.
> > >   1. For blink jobs, the optimizer can set operator resources for
> the
> > >   users, according to their configurations (default: unknown)
> > >   2. For DataStream jobs, there are no method / interface to set
> > >   operator resources at the moment (1.10). We can have in the
> future.
> > >   3. For DataSet jobs, there are existing user interfaces to set
> > >   operator resources.
> > >4. The FLIP should explain more about how ResourceSpecs works
> > >   1. PhysicalTransformations (deployed with operators into the
> > >   StreamTasks) get ResourceSpec: unknown by default or known (e.g.
> > > from the
> > >   Blink planner)
> > >   2. While generating stream graph, calculate fractions and set to
> > >   StreamConfig
> > >   3. While scheduling, convert ResourceSpec to ResourceProfile
> > >   (ResourceSpec + network memory), and deploy to slots / TMs
> matching
> > > the
> > >   resources
> > >   4. While starting Task in TM, each operator gets fraction
> converted
> > >   back to the original absolute value requested by user or fair
> > > unknown share
> > >   of the slot
> > >   5. We should not set `allSourcesInSamePipelinedRegion` to `false`
> > for
> > >DataSet jobs. Behaviors of DataSet jobs should not be changed.
> > >6. The FLIP document should differentiate works planed in this FLIP
> > and
> > >the future follow-ups more clearly, by put the follow-ups in a
> > separate
> > >section
> > >7. Another limitation of the rejected alternative setting fractions
> at
> > >scheduling time is that, the scheduler implementation does not know
> > > which
> > >tasks will be deployed into the same slot in advance.
> > >
> > > Andrey, Please bring it up if there is anything I missed.
> > >
> > > Zhu, regarding your comments:
> > >
> > >1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for
> > >DataSet jobs (point 5 in the discussion summary above), then there
> > >shouldn't be any regression right?
> > >2. I think it makes sense to set the max possible network memory for
> > the
> > >JobVertex. When you say parallel instances of the same JobVertex may
> > > have
> > >need different network memory, I guess you mean the rescale
> scenarios
> > > where
> > >parallelisms of upstream / downstream vertex cannot be exactly
> divided
> > > by
> > >parallelism of downstream / upstream vertex? I would say it's
> > > acceptable to
> > >have slight difference between actually needed and allocated network
> > > memory.
> > >3. Yes, by numOpsUseOnHeapManagedMemory I mean
> > >numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the
> doc.
> > >4. Yes, it should be StreamingJobGraphGenerator. Thanks for the
> > >correction.
> > >
> > >
> > > Kurt, regarding your comments:
> > >
> > >1. I think we don't have network memory in ResourceSpec, which is
> the
> > >user facing API. We only have network memory in ResourceProfile,
> which
> > > is
> > >used internally for scheduling. The reason we do not expose network
> > > memory
> > >to the user is that, currently how many network buffers each task
> > needs
> > > is
> > >decided by the topology of execution graph (how many input / output
> > >channels it has).
> > >2. In the section "Operato

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-04 Thread Becket Qin
Hi Yijie,

Thanks for the interest in contributing the Pulsar connector.

In general, I think having Pulsar connector with strong support is a
valuable addition to Flink. So I am happy the shepherd this effort.
Meanwhile, I would also like to provide some context and recent efforts on
the Flink connectors ecosystem.

The current way Flink maintains its connector has hit the scalability bar.
With more and more connectors coming into Flink repo, we are facing a few
problems such as long build and testing time. To address this problem, we
have attempted to do the following:
1. Split out the connectors into a separate repository. This is temporarily
on hold due to potential solution to shorten the build time.
2. Encourage the connectors to stay as ecosystem project while Flink tries
to provide good support for functionality and compatibility tests. Robert
has driven to create a Flink Ecosystem project website and it is going
through some final approval process.

Given the above efforts, it would be great to first see if we can have
Pulsar connector as an ecosystem project with great support. It would be
good to hear how the Flink Pulsar connector is tested currently to see if
we can learn something to maintain it as an ecosystem project with good
quality and test coverage. If the quality as an ecosystem project is hard
to guarantee, we may as well adopt it into the main repo.

BTW, another ongoing effort is FLIP-27 where we are making changes to the
Flink source connector architecture and interface. This change will likely
land in 1.10. Therefore timing wise, if we are going to have the Pulsar
connector in main repo, I am wondering if we should hold a little bit and
let the Pulsar connector adapt to the new interface to avoid shortly
deprecated work?

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 4, 2019 at 4:32 PM Chesnay Schepler  wrote:

> I'm quite worried that we may end up repeating history.
>
> There were already 2 attempts at contributing a pulsar connector, both
> of which failed because no committer was getting involved, despite the
> contributor opening a dedicated discussion thread about the contribution
> beforehand and getting several +1's from committers.
>
> We should really make sure that if we welcome/approve such a
> contribution it will actually get the attention it deserves.
>
> As such, I'm inclined to recommend maintaining the connector outside of
> Flink. We could link to it from the documentation to give it more exposure.
> With the upcoming page for sharing artifacts among the community (what's
> the state of that anyway?), this may be a better option.
>
> On 04/09/2019 10:16, Till Rohrmann wrote:
> > Hi everyone,
> >
> > thanks a lot for starting this discussion Yijie. I think the Pulsar
> > connector would be a very valuable addition since Pulsar becomes more and
> > more popular and it would further expand Flink's interoperability. Also
> > from a project perspective it makes sense for me to place the connector
> in
> > the downstream project.
> >
> > My main concern/question is how can the Flink community maintain the
> > connector? We have seen in the past that connectors are some of the most
> > actively developed components because they need to be kept in sync with
> the
> > external system and with Flink. Given that the Pulsar community is
> willing
> > to help with maintaining, improving and evolving the connector, I'm
> > optimistic that we can achieve this. Hence, +1 for contributing it back
> to
> > Flink.
> >
> > Cheers,
> > Till
> >
> >
> >
> > On Wed, Sep 4, 2019 at 2:03 AM Sijie Guo  wrote:
> >
> >> Hi Yun,
> >>
> >> Since I was the main driver behind FLINK-9641 and FLINK-9168, let me
> try to
> >> add more context on this.
> >>
> >> FLINK-9641 and FLINK-9168 was created for bringing Pulsar as source and
> >> sink for Flink. The integration was done with Flink 1.6.0. We sent out
> pull
> >> requests about a year ago and we ended up maintaining those connectors
> in
> >> Pulsar for Pulsar users to use Flink to process event streams in Pulsar.
> >> (See https://github.com/apache/pulsar/tree/master/pulsar-flink). The
> Flink
> >> 1.6 integration is pretty simple and there is no schema considerations.
> >>
> >> In the past year, we have made a lot of changes in Pulsar and brought
> >> Pulsar schema as the first-class citizen in Pulsar. We also integrated
> with
> >> other computing engines for processing Pulsar event streams with Pulsar
> >> schema.
> >>
> >> It led us to rethink how to integrate with Flink in the best way. Then
> we
> >> reimplement the pulsar-flink connectors from the ground up with schema
> and
> >> bring table API and catalog API as the first-class citizen in the
> >> integration. With that being said, in the new pulsar-flink
> implementation,
> >> you can register pulsar as a flink catalog and query / process the event
> >> streams using Flink SQL.
> >>
> >> This is an example about how to use Pulsar as a Flink catalog:
> >>
> >>
> https://github.com/st

[jira] [Created] (FLINK-13959) Consolidate DetachedEnvironment and ContextEnvironment

2019-09-04 Thread Kostas Kloudas (Jira)
Kostas Kloudas created FLINK-13959:
--

 Summary: Consolidate DetachedEnvironment and ContextEnvironment
 Key: FLINK-13959
 URL: https://issues.apache.org/jira/browse/FLINK-13959
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Affects Versions: 1.9.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


The code for the two environments is similar, so with a bit of work we can get 
rid of the {{DetachedEnvironment}} and maintain only the {{ContextEnvironment}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-04 Thread Timo Walther

Hi Jincheng,

2. Serializability of functions: "#2 is very convenient for users" means 
only until they have the first backwards-compatibility issue, after that 
they will find it not so convinient anymore and will ask why the 
framework allowed storing such objects in a persistent storage. I don't 
want to be picky about it, but wanted to raise awareness that sometimes 
it is ok to limit use cases to guide users for devloping 
backwards-compatible programs.


Thanks for the explanation fo the remaining items. It sounds reasonable 
to me. Regarding the example with `getKind()`, I actually meant 
`org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow 
users to override this property. And I think we should do something 
similar for the getLanguage property.


Thanks,
Timo

On 03.09.19 15:01, jincheng sun wrote:

Hi Timo,

Thanks for the quick reply ! :)
I have added more example for #3 and #5 to the FLIP. That are great
suggestions !

Regarding 2:

There are two kind Serialization for CloudPickle(Which is different from
Java):
  1) For class and function which can be imported, CloudPickle only
serialize the full path of the class and function (just like java class
name).
  2) For the class and function which can not be imported, CloudPickle will
serialize the full content of the class and function.
For #2, It means that we can not just store the full path of the class and
function.

The above serialization is recursive.

However, there is indeed an problem of backwards compatibility when the
module path of the parent class changed. But I think this is an rare case
and acceptable. i.e., For Flink framework we never change the user
interface module path if we want to keep backwards compatibility. For user
code, if they change the interface of UDF's parent, they should re-register
their functions.

If we do not want support #2, we can store the full path of class and
function, in that case we have no backwards compatibility problem. But I
think the #2 is very convenient for users.

What do you think?

Regarding 4:
As I mentioned earlier, there may be built-in Python functions and I think
language is a "function" concept. Function and Language are orthogonal
concepts.
We may have R, GO and other language functions in the future, not only
user-defined, but also built-in functions.

You are right that users will not set this method and for Python functions,
it will be set in the code-generated Java function by the framework. So, I
think we should declare the getLanguage() in FunctionDefinition for now.
(I'm not pretty sure what do you mean by saying that getKind() is final in
UserDefinedFunction?)

Best,
Jincheng

Timo Walther  于2019年9月3日周二 下午6:01写道:


Hi Jincheng,

thanks for your response.

2. Serializability of functions: Using some arbitrary serialization
format for shipping a function to worker sounds fine to me. But once we
store functions a the catalog we need to think about backwards
compatibility and evolution of interfaces etc. I'm not sure if
CloudPickle is the right long-term storage format for this. If we don't
think about this in advance, we are basically violating our code quality
guide [1] of never use Java Serialization but in the Python-way. We are
using the RPC serialization for persistence.

3. TableEnvironment: Can you add some example to the FLIP? Because API
code like the following is not covered there:

self.t_env.register_function("add_one", udf(lambda i: i + 1,
DataTypes.BIGINT(),
  DataTypes.BIGINT()))
self.t_env.register_function("subtract_one", udf(SubtractOne(),
DataTypes.BIGINT(),
DataTypes.BIGINT()))
self.t_env.register_function("add", add)

4. FunctionDefinition: Your response still doesn't answer my question
entirely. Why do we need FunctionDefinition.getLanguage() if this is a
"user-defined function" concept and not a "function" concept. In any
case, all users should not be able to set this method. So it must be
final in UserDefinedFunction similar to getKind().

5. Function characteristics: If UserDefinedFunction is defined in
Python, why is it not used in your example in FLIP-58. You could you
extend the example to show how to specify these attributes in the FLIP?

Regards,
Timo

[1] https://flink.apache.org/contributing/code-style-and-quality-java.html

On 02.09.19 15:35, jincheng sun wrote:

Hi Timo,

Great thanks for your feedback. I would like to share my thoughts with

you

inline. :)

Best,
Jincheng

Timo Walther  于2019年9月2日周一 下午5:04写道:


Hi all,

the FLIP looks awesome. However, I would like to discuss the changes to
the user-facing parts again. Some feedback:

1. DataViews: With the current non-annotation design for DataViews, we
cannot perform eager state declaration, right? At which point during
execution do we know which state is required by the function? We need to
instantiate the function first, right?


We will analysis the Python AggregateFunction and extract the DataViews

used in the Python Aggregat

Re: Fine grained batch recovery vs. native libraries

2019-09-04 Thread David Morávek
Hi Chesnay, I've created FLINK-13958
 to track the issue.

Thanks,
D.

On Wed, Sep 4, 2019 at 1:56 PM Chesnay Schepler  wrote:

> This sounds like a serious bug, please open a JIRA ticket.
>
> On 04/09/2019 13:41, David Morávek wrote:
> > Hi,
> >
> > we're testing the newly released batch recovery and are running into
> class
> > loading related issues.
> >
> > 1) We have a per-job flink cluster
> > 2) We use BATCH execution mode + region failover strategy
> >
> > Point 1) should imply single user code class loader per task manager
> > (because there is only single pipeline, that reuses class loader cached
> in
> > BlobLibraryCacheManager). We need this property, because we have UDFs
> that
> > access C libraries using JNI (I think this may be fairly common use-case
> > when dealing with legacy code). JDK internals
> > <
> https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/lang/ClassLoader.java#L2466
> >
> > make sure that single library can be only loaded by a single class loader
> > per JVM.
> >
> > When region recovery is triggered, vertices that need recover are first
> > reset back to CREATED stated and then rescheduled. In case all tasks in a
> > task manager are reset, this results in cached class loader being
> released
> > <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L338
> >.
> > This unfortunately causes job failure, because we try to reload a native
> > library in a newly created class loader.
> >
> > I know that there is always possibility to distribute native libraries
> with
> > flink's libs and load it using system class loader, but this introduces a
> > build & operations overhead and just make it really unfriendly for
> cluster
> > user, so I'd rather not work around the issue this way (per-job cluster
> > should be more user friendly).
> >
> > I believe the correct approach would be not to release cached class
> loader
> > if the job is recovering, even though there are no tasks currently
> > registered with TM.
> >
> > What do you think? Thanks for help.
> >
> > D.
> >
>
>


[jira] [Created] (FLINK-13958) Job class loader may not be reused after batch job recovery

2019-09-04 Thread David Moravek (Jira)
David Moravek created FLINK-13958:
-

 Summary: Job class loader may not be reused after batch job 
recovery
 Key: FLINK-13958
 URL: https://issues.apache.org/jira/browse/FLINK-13958
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.9.0
Reporter: David Moravek


[https://lists.apache.org/thread.html/e241be9a1a10810a1203786dff3b7386d265fbe8702815a77bad42eb@%3Cdev.flink.apache.org%3E|http://example.com]

1) We have a per-job flink cluster
2) We use BATCH execution mode + region failover strategy

Point 1) should imply single user code class loader per task manager (because 
there is only single pipeline, that reuses class loader cached in 
BlobLibraryCacheManager). We need this property, because we have UDFs that 
access C libraries using JNI (I think this may be fairly common use-case when 
dealing with legacy code). [JDK 
internals|https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/lang/ClassLoader.java#L2466]
 make sure that single library can be only loaded by a single class loader per 
JVM.

When region recovery is triggered, vertices that need recover are first reset 
back to CREATED stated and then rescheduled. In case all tasks in a task 
manager are reset, this results in [cached class loader being 
released|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L338].
 This unfortunately causes job failure, because we try to reload a native 
library in a newly created class loader.

I believe the correct approach would be not to release cached class loader if 
the job is recovering, even though there are no tasks currently registered with 
TM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13957) Redact passwords from dynamic properties on job submission

2019-09-04 Thread Matyas Orhidi (Jira)
Matyas Orhidi created FLINK-13957:
-

 Summary: Redact passwords from dynamic properties on job submission
 Key: FLINK-13957
 URL: https://issues.apache.org/jira/browse/FLINK-13957
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Affects Versions: 1.9.0
Reporter: Matyas Orhidi
 Fix For: 1.9.1


SSL related passwords specified by dynamic properties are showing up in 
{{FlinkYarnSessionCli}} logs in plain text:

{{19/09/04 04:57:43 INFO cli.FlinkYarnSessionCli: Dynamic Property set: 
security.ssl.internal.truststore-password=changeit}}
{{19/09/04 04:57:43 INFO cli.FlinkYarnSessionCli: Dynamic Property set: 
security.ssl.internal.keystore-password=changeit}}
{{19/09/04 04:57:43 INFO cli.FlinkYarnSessionCli: Dynamic Property set: 
security.ssl.internal.key-password=changeit}}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: Fine grained batch recovery vs. native libraries

2019-09-04 Thread Chesnay Schepler

This sounds like a serious bug, please open a JIRA ticket.

On 04/09/2019 13:41, David Morávek wrote:

Hi,

we're testing the newly released batch recovery and are running into class
loading related issues.

1) We have a per-job flink cluster
2) We use BATCH execution mode + region failover strategy

Point 1) should imply single user code class loader per task manager
(because there is only single pipeline, that reuses class loader cached in
BlobLibraryCacheManager). We need this property, because we have UDFs that
access C libraries using JNI (I think this may be fairly common use-case
when dealing with legacy code). JDK internals

make sure that single library can be only loaded by a single class loader
per JVM.

When region recovery is triggered, vertices that need recover are first
reset back to CREATED stated and then rescheduled. In case all tasks in a
task manager are reset, this results in cached class loader being released
.
This unfortunately causes job failure, because we try to reload a native
library in a newly created class loader.

I know that there is always possibility to distribute native libraries with
flink's libs and load it using system class loader, but this introduces a
build & operations overhead and just make it really unfriendly for cluster
user, so I'd rather not work around the issue this way (per-job cluster
should be more user friendly).

I believe the correct approach would be not to release cached class loader
if the job is recovering, even though there are no tasks currently
registered with TM.

What do you think? Thanks for help.

D.





Re: Potential block size issue with S3 binary files

2019-09-04 Thread Arvid Heise
Hi Ken,

as far as I understood, you are using the format to overcome some short
comings in Flink. There is no need to even look at the data or even to
create it if the join would work decently.

If so, then it would make sense to keep the format, as I expect similar
issues to always appear and providing at least some kind of a workaround
will probably help the user. At that point, I'm leaning towards embedding
the sync markers.

However, I'd take larger gaps than 2k and a larger sync pattern. Even for
your use case, that would mean that we barely fit 2 records in that. I'd
probably also add another format instead of extending the existing one. I
see them as different concepts: the old one assumes a fixed block size,
while the new one doesn't care about it. I opened a ticket for that
https://issues.apache.org/jira/browse/FLINK-13956 .

For the time being, I'd recommend to just setting a fixed block size. If
it's on S3, the block size doesn't really matter much and you can pretty
much tune it to the needed degree of parallelism.


Additionally, it would be interesting to see if we can actually get that
join going. Could you please describe the join and it's performance in
another thread in the user list u...@flink.apache.org ?


Best,

Arvid



On Tue, Sep 3, 2019 at 8:17 PM Ken Krugler 
wrote:

> Hi Arvid,
>
> Thanks for following up…
>
> On Sep 2, 2019, at 3:09 AM, Arvid Heise  wrote:
>
> Hi Ken,
>
> that's indeed a very odd issue that you found. I had a hard time to connect
> block size with S3 in the beginning and had to dig into the code. I still
> cannot fully understand why you got two different block size values from
> the S3 FileSytem. Looking into Hadoop code, I found the following snippet
>
> public long getDefaultBlockSize() {
>return this.getConf().getLong("fs.s3.block.size", 67108864L);
> }
>
> I don't see a quick fix for that. Looks like mismatching configurations on
> different machines. We should probably have some sanity checks to detect
> mismatching block header information, but unfortunately, the block header
> is very primitive and doesn't allow for sophisticated checks.
>
>
> Yes - and what made it harder to debug is that when the incorrect block
> size was set to 32MB, sometimes the first split that got processed was
> split[1] (second actual split). In that situation, the block info record
> was where the code expected it to be (since it was reading from 64MB -
> record size), so it all looked OK, but then the first record processed
> would be at an incorrect position.
>
> So let's focus on implementation solutions:
> 1. I gather that you need to have support for data that uses
> IOReadableWritable. So moving to more versatile solutions like Avro or
> Parquet is unfortunately not an option. I'd still recommend that for any
> new project.
>
>
> See below - it’s not a requirement, but certainly easier.
>
> 2. Storing block size into the repeated headers in a file introduces a kind
> of hen-and-egg problem. You need the block size to read the header to get
> the block size.
> 3. Storing block size once in first block would require additional seeks
> and depending of the degree of parallelism would put a rather high load on
> the data node with the first block.
> 4. Storing block size in metadata would be ideal but with the wide range of
> possible filesystems most likely not doable.
> 5. Explicitly setting the block size would be the most reliable technique
> but quite user-unfriendly, especially, if multiple deployment environment
> use different block sizes.
> 6. Adding a periodic marker seems indeed as the most robust option and
> adding 20 bytes every 2k bytes doesn't seem too bad for me. The downside is
> that seeking can take a long time for larger records as it will linearly
> scan through the bytes at the block start. However, if you really want to
> support copying files across file systems with different block sizes, this
> would be the only option.
> 7. Deprecating sequence format is a good option in the long run. I simply
> don't see that for productive code the performance gain over using Avro or
> Parquet would be noticeable and getting a solid base concept for schema
> evolution will pay off quickly from my experience.
>
> @Ken, could you please describe for what kind of data do you use the
> sequence format? I like to understand your requirements. How large are your
> records (OoM)? Are they POJOs? Do you craft them manually?
>
>
> They are hand-crafted POJOs, typically about 1.2K/record.
>
> It’s a mapping from words to feature vectors (and some additional data).
>
> I then use them as backing store with a cache (in a downstream job) as
> side-input to a map function that creates word vectors from large
> collections of text. This is why the serialized format was appealing, as
> it’s then relatively straightforward to use the existing deserialization
> logic when reading from my custom Java code.
>
> So yes, I could switch to Parquet with some additional work, I’ve

[jira] [Created] (FLINK-13956) Add sequence file format with repeated sync blocks

2019-09-04 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-13956:
---

 Summary: Add sequence file format with repeated sync blocks
 Key: FLINK-13956
 URL: https://issues.apache.org/jira/browse/FLINK-13956
 Project: Flink
  Issue Type: Improvement
Reporter: Arvid Heise


The current {{SequenceFileFormat}} produces files that are tightly bound to the 
block size of the filesystem. While this was a somewhat plausible assumption in 
the old HDFS days, it can lead to [hard to debug issues in other file 
systems|https://lists.apache.org/thread.html/bdd87cbb5eb7b19ab4be6501940ec5659e8f6ce6c27ccefa2680732c@%3Cdev.flink.apache.org%3E].

We could implement a file format similar to the current version of Hadoop's 
SequenceFileFormat: add a sync block inbetween records whenever X bytes were 
written. Hadoop uses 2k, but I'd propose to use 1M.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Jark Wu
+1

Best,
Jark

> 在 2019年9月4日,19:43,David Morávek  写道:
> 
> +1
> 
> On Wed, Sep 4, 2019 at 1:38 PM Till Rohrmann  wrote:
> 
>> +1 (binding)
>> 
>> On Wed, Sep 4, 2019 at 12:43 PM Chesnay Schepler 
>> wrote:
>> 
>>> +1 (binding)
>>> 
>>> On 04/09/2019 11:18, JingsongLee wrote:
 +1 (non-binding)
 
 default 0 is really not user production friendly.
 
 Best,
 Jingsong Lee
 
 
 --
 From:Zhu Zhu 
 Send Time:2019年9月4日(星期三) 17:13
 To:dev 
 Subject:Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay-
>>> and FailureRateRestartStrategy to 1s
 
 +1 (non-binding)
 
 Thanks,
 Zhu Zhu
 
 Till Rohrmann  于2019年9月4日周三 下午5:06写道:
 
> Hi everyone,
> 
> I would like to start the voting process for FLIP-62 [1], which
> is discussed and reached consensus in this thread [2].
> 
> Since the change is rather small I'd like to shorten the voting period
>>> to
> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
>>> unless
> there is an objection or not enough votes.
> 
> [1]
> 
> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> [2]
> 
> 
>>> 
>> https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E
> 
> Cheers,
> Till
> 
>>> 
>>> 
>> 



Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Jark Wu
Hi all,

Regarding #1 temp function <> built-in function and naming.
I'm fine with temp functions should precede built-in function and can
override built-in functions (we already support to override built-in
function in 1.9).
If we don't allow the same name as a built-in function, I'm afraid we will
have compatibility issues in the future.
Say users register a user defined function named "explode" in 1.9, and we
support a built-in "explode" function in 1.10.
Then the user's jobs which call the registered "explode" function in 1.9
will all fail in 1.10 because of naming conflict.

Regarding #2 "External" built-in functions.
I think if we store external built-in functions in catalog, then
"hive1::sqrt" is a good way to go.
However, I would prefer to support a discovery mechanism (e.g. SPI) for
built-in functions as Timo suggested above.
This gives us the flexibility to add Hive or MySQL or Geo or whatever
function set as built-in functions in an easy way.

Best,
Jark

On Wed, 4 Sep 2019 at 17:47, Xuefu Z  wrote:

> Hi David,
>
> Thank you for sharing your findings. It seems to me that there is no SQL
> standard regarding temporary functions. There are few systems that support
> it. Here are what I have found:
>
> 1. Hive: no DB qualifier allowed. Can overwrite built-in.
> 2. Spark: basically follows Hive (
>
> https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html
> )
> 3. SAP SQL Anywhere Server: can have owner (db?). Not sure of overwriting
> behavior. (
> http://dcx.sap.com/sqla170/en/html/816bdf316ce210148d3acbebf6d39b18.html)
>
> Because of lack of standard, it's perfectly fine for Flink to define
> whatever it sees appropriate. Thus, your proposal (no overwriting and must
> have DB as holder) is one option. The advantage is simplicity, The downside
> is the deviation from Hive, which is popular and de facto standard in big
> data world.
>
> However, I don't think we have to follow Hive. More importantly, we need a
> consensus. I have no objection if your proposal is generally agreed upon.
>
> Thanks,
> Xuefu
>
> On Tue, Sep 3, 2019 at 11:58 PM Dawid Wysakowicz 
> wrote:
>
> > Hi all,
> >
> > Just an opinion on the built-in <> temporary functions resolution and
> > NAMING issue. I think we should not allow overriding the built-in
> > functions, as this may pose serious issues and to be honest is rather
> > not feasible and would require major rework. What happens if a user
> > wants to override CAST? Calls to that function are generated at
> > different layers of the stack that unfortunately does not always go
> > through the Catalog API (at least yet). Moreover from what I've checked
> > no other systems allow overriding the built-in functions. All the
> > systems I've checked so far register temporary functions in a
> > database/schema (either special database for temporary functions, or
> > just current database). What I would suggest is to always register
> > temporary functions with a 3 part identifier. The same way as tables,
> > views etc. This effectively means you cannot override built-in
> > functions. With such approach it is natural that the temporary functions
> > end up a step lower in the resolution order:
> >
> > 1. built-in functions (1 part, maybe 2? - this is still under discussion)
> >
> > 2. temporary functions (always 3 part path)
> >
> > 3. catalog functions (always 3 part path)
> >
> > Let me know what do you think.
> >
> > Best,
> >
> > Dawid
> >
> > On 04/09/2019 06:13, Bowen Li wrote:
> > > Hi,
> > >
> > > I agree with Xuefu that the main controversial points are mainly the
> two
> > > places. My thoughts on them:
> > >
> > > 1) Determinism of referencing Hive built-in functions. We can either
> > remove
> > > Hive built-in functions from ambiguous function resolution and require
> > > users to use special syntax for their qualified names, or add a config
> > flag
> > > to catalog constructor/yaml for turning on and off Hive built-in
> > functions
> > > with the flag set to 'false' by default and proper doc added to help
> > users
> > > make their decisions.
> > >
> > > 2) Flink temp functions v.s. Flink built-in functions in ambiguous
> > function
> > > resolution order. We believe Flink temp functions should precede Flink
> > > built-in functions, and I have presented my reasons. Just in case if we
> > > cannot reach an agreement, I propose forbid users registering temp
> > > functions in the same name as a built-in function, like MySQL's
> approach,
> > > for the moment. It won't have any performance concern, since built-in
> > > functions are all in memory and thus cost of a name check will be
> really
> > > trivial.
> > >
> > >
> > > On Tue, Sep 3, 2019 at 8:01 PM Xuefu Z  wrote:
> > >
> > >> From what I have seen, there are a couple of focal disagreements:
> > >>
> > >> 1. Resolution order: temp function --> flink built-in function -->
> > catalog
> > >> function vs flink built-in function --> temp function -> catalog
> > function.
> > >>

Re: [DISCUSS] Reducing build times

2019-09-04 Thread Chesnay Schepler
e2e tests on Travis add another 4-5 hours, but we never optimized these 
to make use of the cached Flink artifact.


On 04/09/2019 13:26, Till Rohrmann wrote:

How long do we need to run all e2e tests? They are not included in the 3,5
hours I assume.

Cheers,
Till

On Wed, Sep 4, 2019 at 11:59 AM Robert Metzger  wrote:


Yes, we can ensure the same (or better) experience for contributors.

On the powerful machines, builds finish in 1.5 hours (without any caching
enabled).

Azure Pipelines offers 10 concurrent builds with a timeout of 6 hours for a
build for open source projects. Flink needs 3.5 hours on that infra (not
parallelized at all, no caching). These free machines are very similar to
those of Travis, so I expect no build time regressions, if we set it up
similarly.


On Wed, Sep 4, 2019 at 9:19 AM Chesnay Schepler 
wrote:


Will using more powerful for the project make it more difficult to
ensure that contributor builds are still running in a reasonable time?

As an example of this happening on Travis, contributors currently cannot
run all e2e tests since they timeout, but on apache we have a larger
timeout.

On 03/09/2019 18:57, Robert Metzger wrote:

Hi all,

I wanted to give a short update on this:
- Arvid, Aljoscha and I have started working on a Gradle PoC, currently
working on making all modules compile and test with Gradle. We've also
identified some problematic areas (shading being the most obvious one)
which we will analyse as part of the PoC.
The goal is to see how much Gradle helps to parallelise our build, and

to

avoid duplicate work (incremental builds).

- I am working on setting up a Flink testing infrastructure based on

Azure

Pipelines, using more powerful hardware. Alibaba kindly provided me

with

two 32 core machines (temporarily), and another company reached out to
privately, looking into options for cheap, fast machines :)
If nobody in the community disagrees, I am going to set up Azure

Pipelines

with our apache/flink GitHub as a build infrastructure that exists next

to

Flinkbot and flink-ci. I would like to make sure that Azure Pipelines

is

equally or even more reliable than Travis, and I want to see what the
required maintenance work is.
On top of that, Azure Pipelines is a very feature-rich tool with a lot

of

nice options for us to improve the build experience (statistics about

tests

(flaky tests etc.), nice docker support, plenty of free build resources

for

open source projects, ...)

Best,
Robert





On Mon, Aug 19, 2019 at 5:12 PM Robert Metzger 

wrote:

Hi all,

I have summarized all arguments mentioned so far + some additional
research into a Wiki page here:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125309279

I'm happy to hear further comments on my summary! I'm pretty sure we

can

find more pro's and con's for the different options.

My opinion after looking at the options:

 - Flink relies on an outdated build tool (Maven), while a good
 alternative is well-established (gradle), and will likely provide

a

much

 better CI and local build experience through incremental build and

cached

 intermediates.
 Scripting around Maven, or splitting modules / test execution /
 repositories won't solve this problem. We should rather spend the

effort in

 migrating to a modern build tool which will provide us benefits in

the long

 run.
 - Flink relies on a fairly slow build service (Travis CI), while
 simply putting more money onto the problem could cut the build

time

at

 least in half.
 We should consider using a build service that provides bigger

machines

 to solve our build time problem.

My opinion is based on many assumptions (gradle is actually as fast as
promised (haven't used it before), we can build Flink with gradle, we

find

sponsors for bigger build machines) that we need to test first through

PoCs.

Best,
Robert




On Mon, Aug 19, 2019 at 10:26 AM Aljoscha Krettek <

aljos...@apache.org>

wrote:


I did a quick test: a normal "mvn clean install -DskipTests
-Drat.skip=true -Dmaven.javadoc.skip=true -Punsafe-mapr-repo” on my

machine

takes about 14 minutes. After removing all mentions of

maven-shade-plugin

the build time goes down to roughly 11.5 minutes. (Obviously the

resulting

Flink won’t work, because some expected stuff is not packaged and

most

of

the end-to-end tests use the shade plugin to package the jars for

testing.

Aljoscha


On 18. Aug 2019, at 19:52, Robert Metzger 

wrote:

Hi all,

I wanted to understand the impact of the hardware we are using for

running

our tests. Each travis worker has 2 virtual cores, and 7.5 gb memory

[1].

They are using Google Cloud Compute Engine *n1-standard-2*

instances.

Running a full "mvn clean verify" takes *03:32 h* on such a machine

type.

Running the same workload on a 32 virtual cores, 64 gb machine,

takes

*1:21

h*.

What is interesting are the per-module build time differences.
Modules which ar

Fine grained batch recovery vs. native libraries

2019-09-04 Thread David Morávek
Hi,

we're testing the newly released batch recovery and are running into class
loading related issues.

1) We have a per-job flink cluster
2) We use BATCH execution mode + region failover strategy

Point 1) should imply single user code class loader per task manager
(because there is only single pipeline, that reuses class loader cached in
BlobLibraryCacheManager). We need this property, because we have UDFs that
access C libraries using JNI (I think this may be fairly common use-case
when dealing with legacy code). JDK internals

make sure that single library can be only loaded by a single class loader
per JVM.

When region recovery is triggered, vertices that need recover are first
reset back to CREATED stated and then rescheduled. In case all tasks in a
task manager are reset, this results in cached class loader being released
.
This unfortunately causes job failure, because we try to reload a native
library in a newly created class loader.

I know that there is always possibility to distribute native libraries with
flink's libs and load it using system class loader, but this introduces a
build & operations overhead and just make it really unfriendly for cluster
user, so I'd rather not work around the issue this way (per-job cluster
should be more user friendly).

I believe the correct approach would be not to release cached class loader
if the job is recovering, even though there are no tasks currently
registered with TM.

What do you think? Thanks for help.

D.


Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread David Morávek
+1

On Wed, Sep 4, 2019 at 1:38 PM Till Rohrmann  wrote:

> +1 (binding)
>
> On Wed, Sep 4, 2019 at 12:43 PM Chesnay Schepler 
> wrote:
>
> > +1 (binding)
> >
> > On 04/09/2019 11:18, JingsongLee wrote:
> > > +1 (non-binding)
> > >
> > > default 0 is really not user production friendly.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > >
> > > --
> > > From:Zhu Zhu 
> > > Send Time:2019年9月4日(星期三) 17:13
> > > To:dev 
> > > Subject:Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay-
> > and FailureRateRestartStrategy to 1s
> > >
> > > +1 (non-binding)
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Till Rohrmann  于2019年9月4日周三 下午5:06写道:
> > >
> > >> Hi everyone,
> > >>
> > >> I would like to start the voting process for FLIP-62 [1], which
> > >> is discussed and reached consensus in this thread [2].
> > >>
> > >> Since the change is rather small I'd like to shorten the voting period
> > to
> > >> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
> > unless
> > >> there is an objection or not enough votes.
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> > >> [2]
> > >>
> > >>
> >
> https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E
> > >>
> > >> Cheers,
> > >> Till
> > >>
> >
> >
>


Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread Till Rohrmann
+1 (binding)

On Wed, Sep 4, 2019 at 12:39 PM Chesnay Schepler  wrote:

> +1 (binding)
>
> On 04/09/2019 11:13, Zhu Zhu wrote:
> > +1 (non-binding)
> >
> > Thanks,
> > Zhu Zhu
> >
> > Till Rohrmann  于2019年9月4日周三 下午5:05写道:
> >
> >> Hi everyone,
> >>
> >> I would like to start the voting process for FLIP-61 [1], which is
> >> discussed and reached consensus in this thread [2].
> >>
> >> Since the change is rather small I'd like to shorten the voting period
> to
> >> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
> unless
> >> there is an objection or not enough votes.
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
> >> [2]
> >>
> >>
> https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E
> >>
> >> Cheers,
> >> Till
> >>
>
>


Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Till Rohrmann
+1 (binding)

On Wed, Sep 4, 2019 at 12:43 PM Chesnay Schepler  wrote:

> +1 (binding)
>
> On 04/09/2019 11:18, JingsongLee wrote:
> > +1 (non-binding)
> >
> > default 0 is really not user production friendly.
> >
> > Best,
> > Jingsong Lee
> >
> >
> > --
> > From:Zhu Zhu 
> > Send Time:2019年9月4日(星期三) 17:13
> > To:dev 
> > Subject:Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay-
> and FailureRateRestartStrategy to 1s
> >
> > +1 (non-binding)
> >
> > Thanks,
> > Zhu Zhu
> >
> > Till Rohrmann  于2019年9月4日周三 下午5:06写道:
> >
> >> Hi everyone,
> >>
> >> I would like to start the voting process for FLIP-62 [1], which
> >> is discussed and reached consensus in this thread [2].
> >>
> >> Since the change is rather small I'd like to shorten the voting period
> to
> >> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET,
> unless
> >> there is an objection or not enough votes.
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> >> [2]
> >>
> >>
> https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E
> >>
> >> Cheers,
> >> Till
> >>
>
>


Re: [DISCUSS] Use static imports for test utilities or asserts.

2019-09-04 Thread Till Rohrmann
Hi Yang,

are you referring to Flink's checkstyle settings which forbids star
imports? If you set up the checkstyle plugin [1] and activate optimize
imports in IntelliJ, then the IDE will take care that everything is correct.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/ide_setup.html#checkstyle-for-java

Cheers,
Till

On Wed, Sep 4, 2019 at 11:12 AM Yang Jeff  wrote:

> Hi all,
> Is it necessary for us to replace all assert or utilities with static
> import ? Thank you very much.
>


Re: [DISCUSS] Reducing build times

2019-09-04 Thread Till Rohrmann
How long do we need to run all e2e tests? They are not included in the 3,5
hours I assume.

Cheers,
Till

On Wed, Sep 4, 2019 at 11:59 AM Robert Metzger  wrote:

> Yes, we can ensure the same (or better) experience for contributors.
>
> On the powerful machines, builds finish in 1.5 hours (without any caching
> enabled).
>
> Azure Pipelines offers 10 concurrent builds with a timeout of 6 hours for a
> build for open source projects. Flink needs 3.5 hours on that infra (not
> parallelized at all, no caching). These free machines are very similar to
> those of Travis, so I expect no build time regressions, if we set it up
> similarly.
>
>
> On Wed, Sep 4, 2019 at 9:19 AM Chesnay Schepler 
> wrote:
>
> > Will using more powerful for the project make it more difficult to
> > ensure that contributor builds are still running in a reasonable time?
> >
> > As an example of this happening on Travis, contributors currently cannot
> > run all e2e tests since they timeout, but on apache we have a larger
> > timeout.
> >
> > On 03/09/2019 18:57, Robert Metzger wrote:
> > > Hi all,
> > >
> > > I wanted to give a short update on this:
> > > - Arvid, Aljoscha and I have started working on a Gradle PoC, currently
> > > working on making all modules compile and test with Gradle. We've also
> > > identified some problematic areas (shading being the most obvious one)
> > > which we will analyse as part of the PoC.
> > > The goal is to see how much Gradle helps to parallelise our build, and
> to
> > > avoid duplicate work (incremental builds).
> > >
> > > - I am working on setting up a Flink testing infrastructure based on
> > Azure
> > > Pipelines, using more powerful hardware. Alibaba kindly provided me
> with
> > > two 32 core machines (temporarily), and another company reached out to
> > > privately, looking into options for cheap, fast machines :)
> > > If nobody in the community disagrees, I am going to set up Azure
> > Pipelines
> > > with our apache/flink GitHub as a build infrastructure that exists next
> > to
> > > Flinkbot and flink-ci. I would like to make sure that Azure Pipelines
> is
> > > equally or even more reliable than Travis, and I want to see what the
> > > required maintenance work is.
> > > On top of that, Azure Pipelines is a very feature-rich tool with a lot
> of
> > > nice options for us to improve the build experience (statistics about
> > tests
> > > (flaky tests etc.), nice docker support, plenty of free build resources
> > for
> > > open source projects, ...)
> > >
> > > Best,
> > > Robert
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Aug 19, 2019 at 5:12 PM Robert Metzger 
> > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I have summarized all arguments mentioned so far + some additional
> > >> research into a Wiki page here:
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125309279
> > >>
> > >> I'm happy to hear further comments on my summary! I'm pretty sure we
> can
> > >> find more pro's and con's for the different options.
> > >>
> > >> My opinion after looking at the options:
> > >>
> > >> - Flink relies on an outdated build tool (Maven), while a good
> > >> alternative is well-established (gradle), and will likely provide
> a
> > much
> > >> better CI and local build experience through incremental build and
> > cached
> > >> intermediates.
> > >> Scripting around Maven, or splitting modules / test execution /
> > >> repositories won't solve this problem. We should rather spend the
> > effort in
> > >> migrating to a modern build tool which will provide us benefits in
> > the long
> > >> run.
> > >> - Flink relies on a fairly slow build service (Travis CI), while
> > >> simply putting more money onto the problem could cut the build
> time
> > at
> > >> least in half.
> > >> We should consider using a build service that provides bigger
> > machines
> > >> to solve our build time problem.
> > >>
> > >> My opinion is based on many assumptions (gradle is actually as fast as
> > >> promised (haven't used it before), we can build Flink with gradle, we
> > find
> > >> sponsors for bigger build machines) that we need to test first through
> > PoCs.
> > >>
> > >> Best,
> > >> Robert
> > >>
> > >>
> > >>
> > >>
> > >> On Mon, Aug 19, 2019 at 10:26 AM Aljoscha Krettek <
> aljos...@apache.org>
> > >> wrote:
> > >>
> > >>> I did a quick test: a normal "mvn clean install -DskipTests
> > >>> -Drat.skip=true -Dmaven.javadoc.skip=true -Punsafe-mapr-repo” on my
> > machine
> > >>> takes about 14 minutes. After removing all mentions of
> > maven-shade-plugin
> > >>> the build time goes down to roughly 11.5 minutes. (Obviously the
> > resulting
> > >>> Flink won’t work, because some expected stuff is not packaged and
> most
> > of
> > >>> the end-to-end tests use the shade plugin to package the jars for
> > testing.
> > >>>
> > >>> Aljoscha
> > >>>
> >  On 18. Aug 2019, at 19:52, Robert Metzger 
> > wrote:
> > 
> >  Hi al

Re: [DISCUSS] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Till Rohrmann
An improved documentation can definitely help. I think Arvid suggested
something like this in the linked SURVEY thread and said that Kafka does
something similar.

The idea of different profiles sounds also promising.

I guess something like this deserves a dedicated effort and someone driving
it.

Cheers,
Till

On Wed, Sep 4, 2019 at 12:45 PM Chesnay Schepler  wrote:

> The issue we seem to run into again and again is that we want to try to
> find a value that provides a good experience when trying out Flink, but
> also somewhat usable for production users.
> We should look into solutions for this; maybe having a "recommended"
> value in the docs would help sufficiently, or even configuration
> profiles for Flink "dev"/"production" which influence the default values.
>
> On 03/09/2019 11:41, Till Rohrmann wrote:
> > Hi everyone,
> >
> > I'd like to discuss changing the default restart delay for FixedDelay-
> and
> > FailureRateRestartStrategy to "1 s" [1].
> >
> > According to a user survey about the default value of the restart delay
> > [2], it turned out that the current default value of "0 s" is not
> optimal.
> > In practice Flink users tend to set it to a non-zero value (e.g. "10 s")
> in
> > order to prevent restart storms originating from overloaded external
> > systems.
> >
> > I would like to set the default restart delay of the
> > FixedDelayRestartStrategy ("restart-strategy.fixed-delay.delay") and of
> the
> > FailureRateRestartStrategy ("restart-strategy.failure-rate.delay") to "1
> > s". "1 s" should prevent restart storms originating from causes outside
> of
> > Flink (e.g. overloaded external systems) and still be fast enough to not
> > having a noticeable effect on most Flink deployments.
> >
> > However, this change will affect all users who currently rely on the
> > current default restart delay value ("0 s"). The plan is to add a release
> > note to make these users aware of this change when upgrading Flink.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> > [2]
> >
> https://lists.apache.org/thread.html/107b15de6b8ac849610d99c4754715d2a8a2f32ddfe9f8da02af2ccc@%3Cdev.flink.apache.org%3E
> >
> > Cheers,
> > Till
> >
>
>


[jira] [Created] (FLINK-13955) Integrate ContinuousFileReaderOperator with StreamTask mailbox execution model

2019-09-04 Thread Alex (Jira)
Alex created FLINK-13955:


 Summary: Integrate ContinuousFileReaderOperator with StreamTask 
mailbox execution model
 Key: FLINK-13955
 URL: https://issues.apache.org/jira/browse/FLINK-13955
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Alex


The {{ContinuousFileReaderOperator}} spawns an additional ({{reader}}) thread 
and coordinates actions with it via {{checkpointLock}}. The operator may block, 
waiting for a notification from the {{reader}} thread, preventing progress in 
mailbox loop.

This is similar to {{AsyncWaitOperator}} situation described in FLINK-12958.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Chesnay Schepler
The issue we seem to run into again and again is that we want to try to 
find a value that provides a good experience when trying out Flink, but 
also somewhat usable for production users.
We should look into solutions for this; maybe having a "recommended" 
value in the docs would help sufficiently, or even configuration 
profiles for Flink "dev"/"production" which influence the default values.


On 03/09/2019 11:41, Till Rohrmann wrote:

Hi everyone,

I'd like to discuss changing the default restart delay for FixedDelay- and
FailureRateRestartStrategy to "1 s" [1].

According to a user survey about the default value of the restart delay
[2], it turned out that the current default value of "0 s" is not optimal.
In practice Flink users tend to set it to a non-zero value (e.g. "10 s") in
order to prevent restart storms originating from overloaded external
systems.

I would like to set the default restart delay of the
FixedDelayRestartStrategy ("restart-strategy.fixed-delay.delay") and of the
FailureRateRestartStrategy ("restart-strategy.failure-rate.delay") to "1
s". "1 s" should prevent restart storms originating from causes outside of
Flink (e.g. overloaded external systems) and still be fast enough to not
having a noticeable effect on most Flink deployments.

However, this change will affect all users who currently rely on the
current default restart delay value ("0 s"). The plan is to add a release
note to make these users aware of this change when upgrading Flink.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
[2]
https://lists.apache.org/thread.html/107b15de6b8ac849610d99c4754715d2a8a2f32ddfe9f8da02af2ccc@%3Cdev.flink.apache.org%3E

Cheers,
Till





Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Chesnay Schepler

+1 (binding)

On 04/09/2019 11:18, JingsongLee wrote:

+1 (non-binding)

default 0 is really not user production friendly.

Best,
Jingsong Lee


--
From:Zhu Zhu 
Send Time:2019年9月4日(星期三) 17:13
To:dev 
Subject:Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and 
FailureRateRestartStrategy to 1s

+1 (non-binding)

Thanks,
Zhu Zhu

Till Rohrmann  于2019年9月4日周三 下午5:06写道:


Hi everyone,

I would like to start the voting process for FLIP-62 [1], which
is discussed and reached consensus in this thread [2].

Since the change is rather small I'd like to shorten the voting period to
48 hours. Hence, I'll try to close it September 6th, 11:00 am CET, unless
there is an objection or not enough votes.

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
[2]

https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E

Cheers,
Till





Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread Chesnay Schepler

+1 (binding)

On 04/09/2019 11:13, Zhu Zhu wrote:

+1 (non-binding)

Thanks,
Zhu Zhu

Till Rohrmann  于2019年9月4日周三 下午5:05写道:


Hi everyone,

I would like to start the voting process for FLIP-61 [1], which is
discussed and reached consensus in this thread [2].

Since the change is rather small I'd like to shorten the voting period to
48 hours. Hence, I'll try to close it September 6th, 11:00 am CET, unless
there is an objection or not enough votes.

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
[2]

https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E

Cheers,
Till





Re: [DISCUSS] Reducing build times

2019-09-04 Thread Robert Metzger
Yes, we can ensure the same (or better) experience for contributors.

On the powerful machines, builds finish in 1.5 hours (without any caching
enabled).

Azure Pipelines offers 10 concurrent builds with a timeout of 6 hours for a
build for open source projects. Flink needs 3.5 hours on that infra (not
parallelized at all, no caching). These free machines are very similar to
those of Travis, so I expect no build time regressions, if we set it up
similarly.


On Wed, Sep 4, 2019 at 9:19 AM Chesnay Schepler  wrote:

> Will using more powerful for the project make it more difficult to
> ensure that contributor builds are still running in a reasonable time?
>
> As an example of this happening on Travis, contributors currently cannot
> run all e2e tests since they timeout, but on apache we have a larger
> timeout.
>
> On 03/09/2019 18:57, Robert Metzger wrote:
> > Hi all,
> >
> > I wanted to give a short update on this:
> > - Arvid, Aljoscha and I have started working on a Gradle PoC, currently
> > working on making all modules compile and test with Gradle. We've also
> > identified some problematic areas (shading being the most obvious one)
> > which we will analyse as part of the PoC.
> > The goal is to see how much Gradle helps to parallelise our build, and to
> > avoid duplicate work (incremental builds).
> >
> > - I am working on setting up a Flink testing infrastructure based on
> Azure
> > Pipelines, using more powerful hardware. Alibaba kindly provided me with
> > two 32 core machines (temporarily), and another company reached out to
> > privately, looking into options for cheap, fast machines :)
> > If nobody in the community disagrees, I am going to set up Azure
> Pipelines
> > with our apache/flink GitHub as a build infrastructure that exists next
> to
> > Flinkbot and flink-ci. I would like to make sure that Azure Pipelines is
> > equally or even more reliable than Travis, and I want to see what the
> > required maintenance work is.
> > On top of that, Azure Pipelines is a very feature-rich tool with a lot of
> > nice options for us to improve the build experience (statistics about
> tests
> > (flaky tests etc.), nice docker support, plenty of free build resources
> for
> > open source projects, ...)
> >
> > Best,
> > Robert
> >
> >
> >
> >
> >
> > On Mon, Aug 19, 2019 at 5:12 PM Robert Metzger 
> wrote:
> >
> >> Hi all,
> >>
> >> I have summarized all arguments mentioned so far + some additional
> >> research into a Wiki page here:
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125309279
> >>
> >> I'm happy to hear further comments on my summary! I'm pretty sure we can
> >> find more pro's and con's for the different options.
> >>
> >> My opinion after looking at the options:
> >>
> >> - Flink relies on an outdated build tool (Maven), while a good
> >> alternative is well-established (gradle), and will likely provide a
> much
> >> better CI and local build experience through incremental build and
> cached
> >> intermediates.
> >> Scripting around Maven, or splitting modules / test execution /
> >> repositories won't solve this problem. We should rather spend the
> effort in
> >> migrating to a modern build tool which will provide us benefits in
> the long
> >> run.
> >> - Flink relies on a fairly slow build service (Travis CI), while
> >> simply putting more money onto the problem could cut the build time
> at
> >> least in half.
> >> We should consider using a build service that provides bigger
> machines
> >> to solve our build time problem.
> >>
> >> My opinion is based on many assumptions (gradle is actually as fast as
> >> promised (haven't used it before), we can build Flink with gradle, we
> find
> >> sponsors for bigger build machines) that we need to test first through
> PoCs.
> >>
> >> Best,
> >> Robert
> >>
> >>
> >>
> >>
> >> On Mon, Aug 19, 2019 at 10:26 AM Aljoscha Krettek 
> >> wrote:
> >>
> >>> I did a quick test: a normal "mvn clean install -DskipTests
> >>> -Drat.skip=true -Dmaven.javadoc.skip=true -Punsafe-mapr-repo” on my
> machine
> >>> takes about 14 minutes. After removing all mentions of
> maven-shade-plugin
> >>> the build time goes down to roughly 11.5 minutes. (Obviously the
> resulting
> >>> Flink won’t work, because some expected stuff is not packaged and most
> of
> >>> the end-to-end tests use the shade plugin to package the jars for
> testing.
> >>>
> >>> Aljoscha
> >>>
>  On 18. Aug 2019, at 19:52, Robert Metzger 
> wrote:
> 
>  Hi all,
> 
>  I wanted to understand the impact of the hardware we are using for
> >>> running
>  our tests. Each travis worker has 2 virtual cores, and 7.5 gb memory
> >>> [1].
>  They are using Google Cloud Compute Engine *n1-standard-2* instances.
>  Running a full "mvn clean verify" takes *03:32 h* on such a machine
> >>> type.
>  Running the same workload on a 32 virtual cores, 64 gb machine, takes
> >>> *1:21
>  h*.
> >>

Re: [DISCUSS] Flink client api enhancement for downstream project

2019-09-04 Thread Kostas Kloudas
Here is the issue, and I will keep on updating it as I find more issues.

https://issues.apache.org/jira/browse/FLINK-13954

This will also cover the refactoring of the Executors that we discussed
in this thread, without any additional functionality (such as the job client).

Kostas

On Wed, Sep 4, 2019 at 11:46 AM Kostas Kloudas  wrote:
>
> Great idea Tison!
>
> I will create the umbrella issue and post it here so that we are all
> on the same page!
>
> Cheers,
> Kostas
>
> On Wed, Sep 4, 2019 at 11:36 AM Zili Chen  wrote:
> >
> > Hi Kostas & Aljoscha,
> >
> > I notice that there is a JIRA(FLINK-13946) which could be included
> > in this refactor thread. Since we agree on most of directions in
> > big picture, is it reasonable that we create an umbrella issue for
> > refactor client APIs and also linked subtasks? It would be a better
> > way that we join forces of our community.
> >
> > Best,
> > tison.
> >
> >
> > Zili Chen  于2019年8月31日周六 下午12:52写道:
> >>
> >> Great Kostas! Looking forward to your POC!
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Jeff Zhang  于2019年8月30日周五 下午11:07写道:
> >>>
> >>> Awesome, @Kostas Looking forward your POC.
> >>>
> >>> Kostas Kloudas  于2019年8月30日周五 下午8:33写道:
> >>>
> >>> > Hi all,
> >>> >
> >>> > I am just writing here to let you know that I am working on a POC that
> >>> > tries to refactor the current state of job submission in Flink.
> >>> > I want to stress out that it introduces NO CHANGES to the current
> >>> > behaviour of Flink. It just re-arranges things and introduces the
> >>> > notion of an Executor, which is the entity responsible for taking the
> >>> > user-code and submitting it for execution.
> >>> >
> >>> > Given this, the discussion about the functionality that the JobClient
> >>> > will expose to the user can go on independently and the same
> >>> > holds for all the open questions so far.
> >>> >
> >>> > I hope I will have some more new to share soon.
> >>> >
> >>> > Thanks,
> >>> > Kostas
> >>> >
> >>> > On Mon, Aug 26, 2019 at 4:20 AM Yang Wang  wrote:
> >>> > >
> >>> > > Hi Zili,
> >>> > >
> >>> > > It make sense to me that a dedicated cluster is started for a per-job
> >>> > > cluster and will not accept more jobs.
> >>> > > Just have a question about the command line.
> >>> > >
> >>> > > Currently we could use the following commands to start different
> >>> > clusters.
> >>> > > *per-job cluster*
> >>> > > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
> >>> > > examples/streaming/WindowJoin.jar
> >>> > > *session cluster*
> >>> > > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
> >>> > > examples/streaming/WindowJoin.jar
> >>> > >
> >>> > > What will it look like after client enhancement?
> >>> > >
> >>> > >
> >>> > > Best,
> >>> > > Yang
> >>> > >
> >>> > > Zili Chen  于2019年8月23日周五 下午10:46写道:
> >>> > >
> >>> > > > Hi Till,
> >>> > > >
> >>> > > > Thanks for your update. Nice to hear :-)
> >>> > > >
> >>> > > > Best,
> >>> > > > tison.
> >>> > > >
> >>> > > >
> >>> > > > Till Rohrmann  于2019年8月23日周五 下午10:39写道:
> >>> > > >
> >>> > > > > Hi Tison,
> >>> > > > >
> >>> > > > > just a quick comment concerning the class loading issues when 
> >>> > > > > using
> >>> > the
> >>> > > > per
> >>> > > > > job mode. The community wants to change it so that the
> >>> > > > > StandaloneJobClusterEntryPoint actually uses the user code class
> >>> > loader
> >>> > > > > with child first class loading [1]. Hence, I hope that this 
> >>> > > > > problem
> >>> > will
> >>> > > > be
> >>> > > > > resolved soon.
> >>> > > > >
> >>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-13840
> >>> > > > >
> >>> > > > > Cheers,
> >>> > > > > Till
> >>> > > > >
> >>> > > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas 
> >>> > > > > 
> >>> > > > wrote:
> >>> > > > >
> >>> > > > > > Hi all,
> >>> > > > > >
> >>> > > > > > On the topic of web submission, I agree with Till that it only
> >>> > seems
> >>> > > > > > to complicate things.
> >>> > > > > > It is bad for security, job isolation (anybody can submit/cancel
> >>> > jobs),
> >>> > > > > > and its
> >>> > > > > > implementation complicates some parts of the code. So, if it 
> >>> > > > > > were
> >>> > to
> >>> > > > > > redesign the
> >>> > > > > > WebUI, maybe this part could be left out. In addition, I would 
> >>> > > > > > say
> >>> > > > > > that the ability to cancel
> >>> > > > > > jobs could also be left out.
> >>> > > > > >
> >>> > > > > > Also I would also be in favour of removing the "detached" mode, 
> >>> > > > > > for
> >>> > > > > > the reasons mentioned
> >>> > > > > > above (i.e. because now we will have a future representing the
> >>> > result
> >>> > > > > > on which the user
> >>> > > > > > can choose to wait or not).
> >>> > > > > >
> >>> > > > > > Now for the separating job submission and cluster creation, I 
> >>> > > > > > am in
> >>> > > > > > favour of keeping both.
> >>> > > > > > Once again, the reasons are mentioned above by Stephan, Till,
> >>> > Aljoscha
> >>>

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-04 Thread Dawid Wysakowicz
Hi Becket,

You are right, that what we had in mind for
ExecutionConfig/CheckpointConfig etc. is the option b) from your email.
In the context of the FLIP-54, those objects are not Configurable. What
we understood as a Configurable by the FLIP-54 are a simple pojos, that
are stored under a single key. Such as the examples either from the ML
thread (Host) or from the design doc (CacheFile). So when configuring
the host user can provide a host like this:

connector.host: address:localhost, port:1234

rather than

connector.host.address: localhost

connector.host.port: 1234

This is important especially if one wants to configure lists of such
objects:

connector.hosts: address:localhost,port:1234;address:localhost,port:4567

The intention was definitely not to store whole complex objects, such as
ExecutionConfig, CheckpointConfig etc. that contain multiple different
options Maybe it makes sense to call it ConfigObject as Aljosha
suggested? What do you think? Would that make it more understandable?

For the initialization/configuration of objects such as ExecutionConfig,
CheckpointConfig you may have a look at FLIP-59[1] where we suggest to
add a configure method to those classes and we pretty much describe the
process you outline in the last message.

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-59%3A+Enable+execution+configuration+from+Configuration+object

On 04/09/2019 03:37, Becket Qin wrote:
> Hi Timo, Dawid and Aljoscha,
>
> Thanks for clarifying the goals. It is very helpful to understand the
> motivation here. It would be great to add them to the FLIP wiki.
>
> I agree that the current FLIP design achieves the two goals it wants to
> achieve. But I am trying to see is if the current approach is the most
> reasonable approach.
>
> Please let me check if I understand this correctly. From end users'
> perspective, they will do the following when they want to configure their
> Flink Jobs.
> 1. Create a Configuration instance, and call setters of Configuration with
> the ConfigOptions defined in different components.
> 2. The Configuration created in step 1 will be passed around, and each
> component will just exact their own options from it.
> 3. ExecutionConfig, CheckpointConfig (and other Config classes) will become
> a Configurable, which is responsible for extracting the configuration
> values from the Configuration set by users in step 1.
>
> The confusion I had was that in step 1, how users are going to set the
> configs for the ExecutionConfig / CheckpointConfig? There may be two ways:
> a) Users will call setConfigurable(ExectionConfigConfigurableOption,
> "config1:v1,config2:v2,config3:v3"), i.e. the entire ExecutionConfig is
> exposed as a Configurable to the users.
> b) Users will call setInteger(MAX_PARALLELISM, 1),
> setInteger(LATENCY_TRACKING_INTERVAL, 1000), etc.. This means users will
> set individual ConfigOptions for the ExecutionConfig. And they do not see
> ExecutionConfig as a Configurable.
>
> I assume we are following b), then do we need to expose Configurable to the
> users in this FLIP? My concern is that the Configurable may be related to
> other mechanism such as plugin which we have not really thought through in
> this FLIP.
>
> I know Becket at least has some thoughts about immutability and loading
>> objects via the configuration but maybe they could be put into a follow-up
>> FLIP if they are needed.
> I am perfectly fine to leave something out of the scope of this FLIP to
> later FLIPs. But I think it is important to avoid introducing something in
> this FLIP that will be shortly changed by the follow-up FLIPs.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Sep 3, 2019 at 8:47 PM Aljoscha Krettek  wrote:
>
>> Hi,
>>
>> I think it’s important to keep in mind the original goals of this FLIP and
>> not let the scope grow indefinitely. As I recall it, the goals are:
>>
>>  - Extend the ConfigOption system enough to allow the Table API to
>> configure options that are right now only available on
>> CheckpointingOptions, ExecutionConfig, and StreamExecutionEnvironment. We
>> also want to do this without manually having to “forward” all the available
>> configuration options by introducing equivalent setters in the Table API
>>
>>  - Do the above while keeping in mind that eventually we want to allow
>> users to configure everything from either the flink-conf.yaml, vie command
>> line parameters, or via a Configuration.
>>
>> I think the FLIP achieves this, with the added side goals of making
>> validation a part of ConfigOptions, making them type safe, and making the
>> validation constraints documentable (via automatic doc generation.) All
>> this without breaking backwards compatibility, if I’m not mistaken.
>>
>> I think we should first agree what the basic goals are so that we can
>> quickly converge to consensus on this FLIP because it blocks other
>> people/work. Among other things FLIP-59 depends on this. What are other
>> opinions that peop

[jira] [Created] (FLINK-13954) Clean up ExecutionEnvironment / JobSubmission code paths

2019-09-04 Thread Kostas Kloudas (Jira)
Kostas Kloudas created FLINK-13954:
--

 Summary: Clean up ExecutionEnvironment / JobSubmission code paths
 Key: FLINK-13954
 URL: https://issues.apache.org/jira/browse/FLINK-13954
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Affects Versions: 1.9.0
Reporter: Kostas Kloudas


This is an umbrella issue to serve as a hub for all issues related to job 
submission / (stream) execution environment refactoring.

 

This issue does not change any existing functionality, but it targets to clean 
up / rearrange the code in the relevant components so that further changes are 
easier to apply.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-04 Thread Becket Qin
Hi Timo and Dawid,

I discussed this offline a little bit with Jingsong and want to double
check with you on the followings 2 questions. Can you please help confirm?

1. How will user set the Configurations?
Users will have two ways to set an ExecutionConfig.
Option 1:
  a) Instantiate ExecutionConfig and call the setters in it.
  b) Then create an ExecutionConfigFactory instance to dump the
ExecutionConfig into a Configuration instance.

Option 2:
  a) calling Configuration.set(ExecutionConfig.MAX_PARALLELISM, 1) and so
on, to set configs in the Configuration instance directly.

If we must have option 1, I agree with Aljoscha that having a
Configurable.writeConfiguration() is better. But if we consider the future
use case of Configurable for Plugins, I would prefer only support option 2.
Usually users will not first create a plugin instance then dump the
configurations. Also, option 2 is more aligned with file based
configuration from something like flink-config.yaml.

2. In the FLIP, will Configuration class hold Configurable objects in its
internal Map?
To create an ExecutionConfig from the Configuration instance, one would do
the following:
  a) one will call Configuration.get(ExecutionConfigOption), where
ExecutionConfigOption is a ConfigOption which has an
ExecutionConfigFactory as the atomicClass.
  b) Create the ExecutionConfigFactory instance and call
ExecutionConfigFactory.fromConfiguration() to create an ExecutionConfig
instance.

Will Configuration.get() method create a new ExecutionConfig and put it
into the *confData* map Map? If not, the Configuration
instance will only contain primitive types, List or
Map. Then I have no concern on this part, because from
Configuration instance's perspective, it is immutable.

Thanks,

Jiangjie (Becket) Qin


On Wed, Sep 4, 2019 at 9:37 AM Becket Qin  wrote:

> Hi Timo, Dawid and Aljoscha,
>
> Thanks for clarifying the goals. It is very helpful to understand the
> motivation here. It would be great to add them to the FLIP wiki.
>
> I agree that the current FLIP design achieves the two goals it wants to
> achieve. But I am trying to see is if the current approach is the most
> reasonable approach.
>
> Please let me check if I understand this correctly. From end users'
> perspective, they will do the following when they want to configure their
> Flink Jobs.
> 1. Create a Configuration instance, and call setters of Configuration with
> the ConfigOptions defined in different components.
> 2. The Configuration created in step 1 will be passed around, and each
> component will just exact their own options from it.
> 3. ExecutionConfig, CheckpointConfig (and other Config classes) will
> become a Configurable, which is responsible for extracting the
> configuration values from the Configuration set by users in step 1.
>
> The confusion I had was that in step 1, how users are going to set the
> configs for the ExecutionConfig / CheckpointConfig? There may be two ways:
> a) Users will call setConfigurable(ExectionConfigConfigurableOption,
> "config1:v1,config2:v2,config3:v3"), i.e. the entire ExecutionConfig is
> exposed as a Configurable to the users.
> b) Users will call setInteger(MAX_PARALLELISM, 1),
> setInteger(LATENCY_TRACKING_INTERVAL, 1000), etc.. This means users will
> set individual ConfigOptions for the ExecutionConfig. And they do not see
> ExecutionConfig as a Configurable.
>
> I assume we are following b), then do we need to expose Configurable to
> the users in this FLIP? My concern is that the Configurable may be related
> to other mechanism such as plugin which we have not really thought through
> in this FLIP.
>
> I know Becket at least has some thoughts about immutability and loading
>> objects via the configuration but maybe they could be put into a follow-up
>> FLIP if they are needed.
>
> I am perfectly fine to leave something out of the scope of this FLIP to
> later FLIPs. But I think it is important to avoid introducing something in
> this FLIP that will be shortly changed by the follow-up FLIPs.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Sep 3, 2019 at 8:47 PM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> I think it’s important to keep in mind the original goals of this FLIP
>> and not let the scope grow indefinitely. As I recall it, the goals are:
>>
>>  - Extend the ConfigOption system enough to allow the Table API to
>> configure options that are right now only available on
>> CheckpointingOptions, ExecutionConfig, and StreamExecutionEnvironment. We
>> also want to do this without manually having to “forward” all the available
>> configuration options by introducing equivalent setters in the Table API
>>
>>  - Do the above while keeping in mind that eventually we want to allow
>> users to configure everything from either the flink-conf.yaml, vie command
>> line parameters, or via a Configuration.
>>
>> I think the FLIP achieves this, with the added side goals of making
>> validation a part of ConfigOptions, making them type safe, and 

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Xuefu Z
Hi David,

Thank you for sharing your findings. It seems to me that there is no SQL
standard regarding temporary functions. There are few systems that support
it. Here are what I have found:

1. Hive: no DB qualifier allowed. Can overwrite built-in.
2. Spark: basically follows Hive (
https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html
)
3. SAP SQL Anywhere Server: can have owner (db?). Not sure of overwriting
behavior. (
http://dcx.sap.com/sqla170/en/html/816bdf316ce210148d3acbebf6d39b18.html)

Because of lack of standard, it's perfectly fine for Flink to define
whatever it sees appropriate. Thus, your proposal (no overwriting and must
have DB as holder) is one option. The advantage is simplicity, The downside
is the deviation from Hive, which is popular and de facto standard in big
data world.

However, I don't think we have to follow Hive. More importantly, we need a
consensus. I have no objection if your proposal is generally agreed upon.

Thanks,
Xuefu

On Tue, Sep 3, 2019 at 11:58 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> Just an opinion on the built-in <> temporary functions resolution and
> NAMING issue. I think we should not allow overriding the built-in
> functions, as this may pose serious issues and to be honest is rather
> not feasible and would require major rework. What happens if a user
> wants to override CAST? Calls to that function are generated at
> different layers of the stack that unfortunately does not always go
> through the Catalog API (at least yet). Moreover from what I've checked
> no other systems allow overriding the built-in functions. All the
> systems I've checked so far register temporary functions in a
> database/schema (either special database for temporary functions, or
> just current database). What I would suggest is to always register
> temporary functions with a 3 part identifier. The same way as tables,
> views etc. This effectively means you cannot override built-in
> functions. With such approach it is natural that the temporary functions
> end up a step lower in the resolution order:
>
> 1. built-in functions (1 part, maybe 2? - this is still under discussion)
>
> 2. temporary functions (always 3 part path)
>
> 3. catalog functions (always 3 part path)
>
> Let me know what do you think.
>
> Best,
>
> Dawid
>
> On 04/09/2019 06:13, Bowen Li wrote:
> > Hi,
> >
> > I agree with Xuefu that the main controversial points are mainly the two
> > places. My thoughts on them:
> >
> > 1) Determinism of referencing Hive built-in functions. We can either
> remove
> > Hive built-in functions from ambiguous function resolution and require
> > users to use special syntax for their qualified names, or add a config
> flag
> > to catalog constructor/yaml for turning on and off Hive built-in
> functions
> > with the flag set to 'false' by default and proper doc added to help
> users
> > make their decisions.
> >
> > 2) Flink temp functions v.s. Flink built-in functions in ambiguous
> function
> > resolution order. We believe Flink temp functions should precede Flink
> > built-in functions, and I have presented my reasons. Just in case if we
> > cannot reach an agreement, I propose forbid users registering temp
> > functions in the same name as a built-in function, like MySQL's approach,
> > for the moment. It won't have any performance concern, since built-in
> > functions are all in memory and thus cost of a name check will be really
> > trivial.
> >
> >
> > On Tue, Sep 3, 2019 at 8:01 PM Xuefu Z  wrote:
> >
> >> From what I have seen, there are a couple of focal disagreements:
> >>
> >> 1. Resolution order: temp function --> flink built-in function -->
> catalog
> >> function vs flink built-in function --> temp function -> catalog
> function.
> >> 2. "External" built-in functions: how to treat built-in functions in
> >> external system and how users reference them
> >>
> >> For #1, I agree with Bowen that temp function needs to be at the highest
> >> priority because that's how a user might overwrite a built-in function
> >> without referencing a persistent, overwriting catalog function with a
> fully
> >> qualified name. Putting built-in functions at the highest priority
> >> eliminates that usage.
> >>
> >> For #2, I saw a general agreement on referencing "external" built-in
> >> functions such as those in Hive needs to be explicit and deterministic
> even
> >> though different approaches are proposed. To limit the scope and simply
> the
> >> usage, it seems making sense to me to introduce special syntax for
> user  to
> >> explicitly reference an external built-in function such as hive1::sqrt
> or
> >> hive1._built_in.sqrt. This is a DML syntax matching nicely Catalog API
> call
> >> hive1.getFunction(ObjectPath functionName) where the database name is
> >> absent for bulit-in functions available in that catalog hive1. I
> understand
> >> that Bowen's original proposal was trying to avoid this, but this could
> >> turn out to be a clean and

Re: [DISCUSS] Flink client api enhancement for downstream project

2019-09-04 Thread Kostas Kloudas
Great idea Tison!

I will create the umbrella issue and post it here so that we are all
on the same page!

Cheers,
Kostas

On Wed, Sep 4, 2019 at 11:36 AM Zili Chen  wrote:
>
> Hi Kostas & Aljoscha,
>
> I notice that there is a JIRA(FLINK-13946) which could be included
> in this refactor thread. Since we agree on most of directions in
> big picture, is it reasonable that we create an umbrella issue for
> refactor client APIs and also linked subtasks? It would be a better
> way that we join forces of our community.
>
> Best,
> tison.
>
>
> Zili Chen  于2019年8月31日周六 下午12:52写道:
>>
>> Great Kostas! Looking forward to your POC!
>>
>> Best,
>> tison.
>>
>>
>> Jeff Zhang  于2019年8月30日周五 下午11:07写道:
>>>
>>> Awesome, @Kostas Looking forward your POC.
>>>
>>> Kostas Kloudas  于2019年8月30日周五 下午8:33写道:
>>>
>>> > Hi all,
>>> >
>>> > I am just writing here to let you know that I am working on a POC that
>>> > tries to refactor the current state of job submission in Flink.
>>> > I want to stress out that it introduces NO CHANGES to the current
>>> > behaviour of Flink. It just re-arranges things and introduces the
>>> > notion of an Executor, which is the entity responsible for taking the
>>> > user-code and submitting it for execution.
>>> >
>>> > Given this, the discussion about the functionality that the JobClient
>>> > will expose to the user can go on independently and the same
>>> > holds for all the open questions so far.
>>> >
>>> > I hope I will have some more new to share soon.
>>> >
>>> > Thanks,
>>> > Kostas
>>> >
>>> > On Mon, Aug 26, 2019 at 4:20 AM Yang Wang  wrote:
>>> > >
>>> > > Hi Zili,
>>> > >
>>> > > It make sense to me that a dedicated cluster is started for a per-job
>>> > > cluster and will not accept more jobs.
>>> > > Just have a question about the command line.
>>> > >
>>> > > Currently we could use the following commands to start different
>>> > clusters.
>>> > > *per-job cluster*
>>> > > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
>>> > > examples/streaming/WindowJoin.jar
>>> > > *session cluster*
>>> > > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
>>> > > examples/streaming/WindowJoin.jar
>>> > >
>>> > > What will it look like after client enhancement?
>>> > >
>>> > >
>>> > > Best,
>>> > > Yang
>>> > >
>>> > > Zili Chen  于2019年8月23日周五 下午10:46写道:
>>> > >
>>> > > > Hi Till,
>>> > > >
>>> > > > Thanks for your update. Nice to hear :-)
>>> > > >
>>> > > > Best,
>>> > > > tison.
>>> > > >
>>> > > >
>>> > > > Till Rohrmann  于2019年8月23日周五 下午10:39写道:
>>> > > >
>>> > > > > Hi Tison,
>>> > > > >
>>> > > > > just a quick comment concerning the class loading issues when using
>>> > the
>>> > > > per
>>> > > > > job mode. The community wants to change it so that the
>>> > > > > StandaloneJobClusterEntryPoint actually uses the user code class
>>> > loader
>>> > > > > with child first class loading [1]. Hence, I hope that this problem
>>> > will
>>> > > > be
>>> > > > > resolved soon.
>>> > > > >
>>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-13840
>>> > > > >
>>> > > > > Cheers,
>>> > > > > Till
>>> > > > >
>>> > > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas 
>>> > > > wrote:
>>> > > > >
>>> > > > > > Hi all,
>>> > > > > >
>>> > > > > > On the topic of web submission, I agree with Till that it only
>>> > seems
>>> > > > > > to complicate things.
>>> > > > > > It is bad for security, job isolation (anybody can submit/cancel
>>> > jobs),
>>> > > > > > and its
>>> > > > > > implementation complicates some parts of the code. So, if it were
>>> > to
>>> > > > > > redesign the
>>> > > > > > WebUI, maybe this part could be left out. In addition, I would say
>>> > > > > > that the ability to cancel
>>> > > > > > jobs could also be left out.
>>> > > > > >
>>> > > > > > Also I would also be in favour of removing the "detached" mode, 
>>> > > > > > for
>>> > > > > > the reasons mentioned
>>> > > > > > above (i.e. because now we will have a future representing the
>>> > result
>>> > > > > > on which the user
>>> > > > > > can choose to wait or not).
>>> > > > > >
>>> > > > > > Now for the separating job submission and cluster creation, I am 
>>> > > > > > in
>>> > > > > > favour of keeping both.
>>> > > > > > Once again, the reasons are mentioned above by Stephan, Till,
>>> > Aljoscha
>>> > > > > > and also Zili seems
>>> > > > > > to agree. They mainly have to do with security, isolation and ease
>>> > of
>>> > > > > > resource management
>>> > > > > > for the user as he knows that "when my job is done, everything
>>> > will be
>>> > > > > > cleared up". This is
>>> > > > > > also the experience you get when launching a process on your local
>>> > OS.
>>> > > > > >
>>> > > > > > On excluding the per-job mode from returning a JobClient or not, I
>>> > > > > > believe that eventually
>>> > > > > > it would be nice to allow users to get back a jobClient. The
>>> > reason is
>>> > > > > > that 1) I cannot
>>> > > > > > find any objective reason why the user-experience shoul

Re: [DISCUSS] Flink client api enhancement for downstream project

2019-09-04 Thread Zili Chen
Hi Kostas & Aljoscha,

I notice that there is a JIRA(FLINK-13946) which could be included
in this refactor thread. Since we agree on most of directions in
big picture, is it reasonable that we create an umbrella issue for
refactor client APIs and also linked subtasks? It would be a better
way that we join forces of our community.

Best,
tison.


Zili Chen  于2019年8月31日周六 下午12:52写道:

> Great Kostas! Looking forward to your POC!
>
> Best,
> tison.
>
>
> Jeff Zhang  于2019年8月30日周五 下午11:07写道:
>
>> Awesome, @Kostas Looking forward your POC.
>>
>> Kostas Kloudas  于2019年8月30日周五 下午8:33写道:
>>
>> > Hi all,
>> >
>> > I am just writing here to let you know that I am working on a POC that
>> > tries to refactor the current state of job submission in Flink.
>> > I want to stress out that it introduces NO CHANGES to the current
>> > behaviour of Flink. It just re-arranges things and introduces the
>> > notion of an Executor, which is the entity responsible for taking the
>> > user-code and submitting it for execution.
>> >
>> > Given this, the discussion about the functionality that the JobClient
>> > will expose to the user can go on independently and the same
>> > holds for all the open questions so far.
>> >
>> > I hope I will have some more new to share soon.
>> >
>> > Thanks,
>> > Kostas
>> >
>> > On Mon, Aug 26, 2019 at 4:20 AM Yang Wang 
>> wrote:
>> > >
>> > > Hi Zili,
>> > >
>> > > It make sense to me that a dedicated cluster is started for a per-job
>> > > cluster and will not accept more jobs.
>> > > Just have a question about the command line.
>> > >
>> > > Currently we could use the following commands to start different
>> > clusters.
>> > > *per-job cluster*
>> > > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
>> > > examples/streaming/WindowJoin.jar
>> > > *session cluster*
>> > > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
>> > > examples/streaming/WindowJoin.jar
>> > >
>> > > What will it look like after client enhancement?
>> > >
>> > >
>> > > Best,
>> > > Yang
>> > >
>> > > Zili Chen  于2019年8月23日周五 下午10:46写道:
>> > >
>> > > > Hi Till,
>> > > >
>> > > > Thanks for your update. Nice to hear :-)
>> > > >
>> > > > Best,
>> > > > tison.
>> > > >
>> > > >
>> > > > Till Rohrmann  于2019年8月23日周五 下午10:39写道:
>> > > >
>> > > > > Hi Tison,
>> > > > >
>> > > > > just a quick comment concerning the class loading issues when
>> using
>> > the
>> > > > per
>> > > > > job mode. The community wants to change it so that the
>> > > > > StandaloneJobClusterEntryPoint actually uses the user code class
>> > loader
>> > > > > with child first class loading [1]. Hence, I hope that this
>> problem
>> > will
>> > > > be
>> > > > > resolved soon.
>> > > > >
>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-13840
>> > > > >
>> > > > > Cheers,
>> > > > > Till
>> > > > >
>> > > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas <
>> kklou...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > On the topic of web submission, I agree with Till that it only
>> > seems
>> > > > > > to complicate things.
>> > > > > > It is bad for security, job isolation (anybody can submit/cancel
>> > jobs),
>> > > > > > and its
>> > > > > > implementation complicates some parts of the code. So, if it
>> were
>> > to
>> > > > > > redesign the
>> > > > > > WebUI, maybe this part could be left out. In addition, I would
>> say
>> > > > > > that the ability to cancel
>> > > > > > jobs could also be left out.
>> > > > > >
>> > > > > > Also I would also be in favour of removing the "detached" mode,
>> for
>> > > > > > the reasons mentioned
>> > > > > > above (i.e. because now we will have a future representing the
>> > result
>> > > > > > on which the user
>> > > > > > can choose to wait or not).
>> > > > > >
>> > > > > > Now for the separating job submission and cluster creation, I
>> am in
>> > > > > > favour of keeping both.
>> > > > > > Once again, the reasons are mentioned above by Stephan, Till,
>> > Aljoscha
>> > > > > > and also Zili seems
>> > > > > > to agree. They mainly have to do with security, isolation and
>> ease
>> > of
>> > > > > > resource management
>> > > > > > for the user as he knows that "when my job is done, everything
>> > will be
>> > > > > > cleared up". This is
>> > > > > > also the experience you get when launching a process on your
>> local
>> > OS.
>> > > > > >
>> > > > > > On excluding the per-job mode from returning a JobClient or
>> not, I
>> > > > > > believe that eventually
>> > > > > > it would be nice to allow users to get back a jobClient. The
>> > reason is
>> > > > > > that 1) I cannot
>> > > > > > find any objective reason why the user-experience should
>> diverge,
>> > and
>> > > > > > 2) this will be the
>> > > > > > way that the user will be able to interact with his running job.
>> > > > > > Assuming that the necessary
>> > > > > > ports are open for the REST API to work, then I think that the
>> > > > > > JobClient can run against the
>> > > > >

Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread JingsongLee
+1 (non-binding)

default 0 is really not user production friendly.

Best,
Jingsong Lee


--
From:Zhu Zhu 
Send Time:2019年9月4日(星期三) 17:13
To:dev 
Subject:Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and 
FailureRateRestartStrategy to 1s

+1 (non-binding)

Thanks,
Zhu Zhu

Till Rohrmann  于2019年9月4日周三 下午5:06写道:

> Hi everyone,
>
> I would like to start the voting process for FLIP-62 [1], which
> is discussed and reached consensus in this thread [2].
>
> Since the change is rather small I'd like to shorten the voting period to
> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET, unless
> there is an objection or not enough votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> [2]
>
> https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E
>
> Cheers,
> Till
>


Re: [VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Zhu Zhu
+1 (non-binding)

Thanks,
Zhu Zhu

Till Rohrmann  于2019年9月4日周三 下午5:06写道:

> Hi everyone,
>
> I would like to start the voting process for FLIP-62 [1], which
> is discussed and reached consensus in this thread [2].
>
> Since the change is rather small I'd like to shorten the voting period to
> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET, unless
> there is an objection or not enough votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> [2]
>
> https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E
>
> Cheers,
> Till
>


Re: [VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread Zhu Zhu
+1 (non-binding)

Thanks,
Zhu Zhu

Till Rohrmann  于2019年9月4日周三 下午5:05写道:

> Hi everyone,
>
> I would like to start the voting process for FLIP-61 [1], which is
> discussed and reached consensus in this thread [2].
>
> Since the change is rather small I'd like to shorten the voting period to
> 48 hours. Hence, I'll try to close it September 6th, 11:00 am CET, unless
> there is an objection or not enough votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
> [2]
>
> https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E
>
> Cheers,
> Till
>


[DISCUSS] Use static imports for test utilities or asserts.

2019-09-04 Thread Yang Jeff
Hi all,
Is it necessary for us to replace all assert or utilities with static import ? 
Thank you very much.


[VOTE] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Till Rohrmann
Hi everyone,

I would like to start the voting process for FLIP-62 [1], which
is discussed and reached consensus in this thread [2].

Since the change is rather small I'd like to shorten the voting period to
48 hours. Hence, I'll try to close it September 6th, 11:00 am CET, unless
there is an objection or not enough votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
[2]
https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E

Cheers,
Till


[VOTE] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread Till Rohrmann
Hi everyone,

I would like to start the voting process for FLIP-61 [1], which is
discussed and reached consensus in this thread [2].

Since the change is rather small I'd like to shorten the voting period to
48 hours. Hence, I'll try to close it September 6th, 11:00 am CET, unless
there is an objection or not enough votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
[2]
https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-04 Thread Aljoscha Krettek
Hi,

I’m just running the last tests on FLINK-13586 on Travis and them I’m merging.

Best,
Aljoscha 

> On 4. Sep 2019, at 07:37, Jark Wu  wrote:
> 
> Thanks for the work Jincheng! 
> 
> I have moved remaining major issues to 1.8.3 except FLINK-13586. 
> 
> Hi @Aljoscha Krettek  , is that possible to merge 
> FLINK-13586 today? 
> 
> Best,
> Jark
> 
> On Wed, 4 Sep 2019 at 10:47, jincheng sun  > wrote:
> Thanks for the udpate Jark!
> 
> I have add the new version 1.8.3 in JIRA, could you please remark the
> JIRAs(Such as FLINK-13689) which we do not want merge into the 1.8.2
> release :)
> 
>  You are right, I think FLINK-13586 is better to be contained in 1.8.2
> release!
> 
> Thanks,
> Jincheng
> 
> 
> Jark Wu mailto:imj...@gmail.com>> 于2019年9月4日周三 上午10:15写道:
> 
> > Hi all,
> >
> > I am very happy to say that all the blockers and critical issues for
> > release 1.8.2 have been resolved!
> >
> > Great thanks to everyone who contribute to the release.
> >
> > I hope to create the first RC on Sep 05, at 10:00 UTC+8.
> > If you find some other blocker issues for 1.8.2, please let me know before
> > that to account for it for the 1.8.2 release.
> >
> > Before cutting the RC1, I think it has chance to merge the
> > ClosureCleaner.clean fix (FLINK-13586), because the review and travis are
> > both passed.
> >
> > Cheers,
> > Jark
> >
> > On Wed, 4 Sep 2019 at 00:45, Kostas Kloudas  > > wrote:
> >
> > > Yes, I will do that Jark!
> > >
> > > Kostas
> > >
> > > On Tue, Sep 3, 2019 at 4:19 PM Jark Wu  > > > wrote:
> > > >
> > > > Thanks Kostas for the quick fixing.
> > > >
> > > > However, I find that FLINK-13940 still target to 1.8.2 as a blocker.
> > If I
> > > > understand correctly, FLINK-13940 is aiming for a nicer and better
> > > solution
> > > > in the future.
> > > > So should we update the fixVersion of FLINK-13940?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Tue, 3 Sep 2019 at 21:33, Kostas Kloudas  > > > >
> > wrote:
> > > >
> > > > > Thanks for waiting!
> > > > >
> > > > > A fix for FLINK-13940 has been merged on 1.8, 1.9 and the master
> > under
> > > > > FLINK-13941.
> > > > >
> > > > > Cheers,
> > > > > Kostas
> > > > >
> > > > > On Tue, Sep 3, 2019 at 11:25 AM jincheng sun <
> > sunjincheng...@gmail.com 
> > > >
> > > > > wrote:
> > > > > >
> > > > > > +1 FLINK-13940  > > > > > >
> > > is a
> > > > > > blocker, due to loss data is very important bug, And great thanks
> > for
> > > > > > helping fix it  Kostas!
> > > > > >
> > > > > > Best, Jincheng
> > > > > >
> > > > > > Kostas Kloudas mailto:kklou...@gmail.com>> 
> > > > > > 于2019年9月2日周一 下午7:20写道:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I think this should be also considered a blocker
> > > > > > > https://issues.apache.org/jira/browse/FLINK-13940 
> > > > > > > .
> > > > > > > It is not a regression but it can result to data loss.
> > > > > > >
> > > > > > > I think I can have a quick fix by tomorrow.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Kostas
> > > > > > >
> > > > > > > On Mon, Sep 2, 2019 at 12:01 PM jincheng sun <
> > > sunjincheng...@gmail.com 
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks for all of your feedback!
> > > > > > > >
> > > > > > > > Hi Jark, Glad to see that you are doing what RM should doing.
> > > > > > > >
> > > > > > > > Only one tips here is before the RC1 all the blocker should be
> > > > > fixed, but
> > > > > > > > othrers is nice to have. So you can decide when to prepare RC1
> > > after
> > > > > the
> > > > > > > > blokcer is resolved.
> > > > > > > >
> > > > > > > > Feel free to tell me if you have any questions.
> > > > > > > >
> > > > > > > > Best,Jincheng
> > > > > > > >
> > > > > > > > Aljoscha Krettek  > > > > > > > > 于2019年9月2日周一 下午5:03写道:
> > > > > > > >
> > > > > > > > > I cut a PR for FLINK-13586:
> > > > > https://github.com/apache/flink/pull/9595 
> > > > > 
> > > > > > > <
> > > > > > > > > https://github.com/apache/flink/pull/9595 
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > > On 2. Sep 2019, at 05:03, Yu Li  > > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > +1 for a 1.8.2 release, thanks for bringing this up
> > Jincheng!
> > > > > > > > > >
> > > > > > > > > > Best Regards,
> > > > > > > > > > Yu
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise  > > > > > > > > > >
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> +1 for the 1.8

Re: [DISCUSS] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-04 Thread Till Rohrmann
I guess that most things have already been said on the related discussion
thread [1]. Hence, I'm going to start the vote.

[1]
https://lists.apache.org/thread.html/107b15de6b8ac849610d99c4754715d2a8a2f32ddfe9f8da02af2ccc@%3Cdev.flink.apache.org%3E

Cheers,
Till

On Tue, Sep 3, 2019 at 11:41 AM Till Rohrmann  wrote:

> Hi everyone,
>
> I'd like to discuss changing the default restart delay for FixedDelay- and
> FailureRateRestartStrategy to "1 s" [1].
>
> According to a user survey about the default value of the restart delay
> [2], it turned out that the current default value of "0 s" is not optimal.
> In practice Flink users tend to set it to a non-zero value (e.g. "10 s") in
> order to prevent restart storms originating from overloaded external
> systems.
>
> I would like to set the default restart delay of the
> FixedDelayRestartStrategy ("restart-strategy.fixed-delay.delay") and of the
> FailureRateRestartStrategy ("restart-strategy.failure-rate.delay") to "1
> s". "1 s" should prevent restart storms originating from causes outside of
> Flink (e.g. overloaded external systems) and still be fast enough to not
> having a noticeable effect on most Flink deployments.
>
> However, this change will affect all users who currently rely on the
> current default restart delay value ("0 s"). The plan is to add a release
> note to make these users aware of this change when upgrading Flink.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
> [2]
> https://lists.apache.org/thread.html/107b15de6b8ac849610d99c4754715d2a8a2f32ddfe9f8da02af2ccc@%3Cdev.flink.apache.org%3E
>
> Cheers,
> Till
>


Re: [DISCUSS] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-04 Thread Till Rohrmann
I guess that most things have already been said on the related discussion
thread [1]. Hence I will start a vote about this FLIP.

[1]
https://lists.apache.org/thread.html/80bef7146f9696f35b1e50ff4acdd1cc3e87ae6f212d205aa7a72182@%3Cdev.flink.apache.org%3E

Cheers,
Till

On Mon, Sep 2, 2019 at 10:56 PM Till Rohrmann  wrote:

> Hi everyone,
>
> I'd like to discuss FLIP-61 [1] which tries to simplify Flink's cluster
> lever RestartStrategy configuration.
>
> Currently, Flink's behaviour with respect to configuring the
> `RestartStrategies` is quite complicated and convoluted. The reason for
> this is that we evolved the way it has been configured and wanted to keep
> it backwards compatible. Due to this, we have currently the following
> behaviour:
>
> * If the config option `restart-strategy` is configured, then Flink uses
> this `RestartStrategy` (so far so simple)
> * If the config option `restart-strategy` is not configured, then
> ** If `restart-strategy.fixed-delay.attempts` or
> `restart-strategy.fixed-delay.delay` are defined, then instantiate
> `FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts,
> restart-strategy.fixed-delay.delay)`
> ** If `restart-strategy.fixed-delay.attempts` and
> `restart-strategy.fixed-delay.delay` are not defined, then
> *** If checkpointing is disabled, then choose `NoRestartStrategy`
> *** If checkpointing is enabled, then choose
> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>
> I would like to simplify the configuration by removing the "If
> `restart-strategy.fixed-delay.attempts` or
> `restart-strategy.fixed-delay.delay`, then" condition. That way, the logic
> would be the following:
>
> * If the config option `restart-strategy` is configured, then Flink uses
> this `RestartStrategy`
> * If the config option `restart-strategy` is not configured, then
> ** If checkpointing is disabled, then choose `NoRestartStrategy`
> ** If checkpointing is enabled, then choose
> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>
> That way we retain the user friendliness that jobs restart if the user
> enabled checkpointing and we make it clear that any `
> restart-strategy.fixed-delay.xyz` setting will only be respected if
> `restart-strategy` has been set to `fixed-delay`.
>
> This simplification would, however, change Flink's behaviour and might
> break existing setups. Since we introduced `RestartStrategies` with Flink
> 1.0.0 and deprecated the prior configuration mechanism which enables
> restarting if either the `attempts` or the `delay` has been set, I think
> that the number of broken jobs should be minimal if not non-existent.
>
> There has been a previous discuss thread which is now being abandoned [2].
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
> [2]
> https://lists.apache.org/thread.html/80bef7146f9696f35b1e50ff4acdd1cc3e87ae6f212d205aa7a72182@%3Cdev.flink.apache.org%3E
>
> Cheers,
> Till
>


Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Xintong Song
@till

> Just to clarify Xintong, you suggest that Task off-heap memory represents
> direct and native memory. Since we don't know how the user will allocate
> the memory we will add this value to -XX:MaxDirectMemorySize so that the
> process won't fail if the user allocates only direct memory and no native
> memory. Is that correct?
>
Yes, this is what I mean.


Thank you~

Xintong Song



On Wed, Sep 4, 2019 at 4:25 PM Till Rohrmann  wrote:

> Just to clarify Xintong, you suggest that Task off-heap memory represents
> direct and native memory. Since we don't know how the user will allocate
> the memory we will add this value to -XX:MaxDirectMemorySize so that the
> process won't fail if the user allocates only direct memory and no native
> memory. Is that correct?
>
> Cheers,
> Till
>
> On Wed, Sep 4, 2019 at 10:18 AM Xintong Song 
> wrote:
>
> > @Stephan
> > Not sure what do you mean by "just having this value". Are you suggesting
> > that having "taskmanager.memory.network" refers to "shuffle" and "other
> > network memory", or only "shuffle"?
> >
> > I guess what you mean is only "shuffle"? Because currently
> > "taskmanager.network.memory" refers to shuffle buffers only, which is
> "one
> > less config value to break".
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Sep 4, 2019 at 3:42 PM Stephan Ewen  wrote:
> >
> > > If we later split the network memory into "shuffle" and "other network
> > > memory", I think it would make sense to split the option then.
> > >
> > > In that case "taskmanager.memory.network" would probably refer to the
> > total
> > > network memory, which would most likely be what most users actually
> > > configure.
> > > My feeling is that for now just having this value is actually easier,
> and
> > > it is one less config value to break (which is also good).
> > >
> > > On Wed, Sep 4, 2019 at 9:05 AM Xintong Song 
> > wrote:
> > >
> > > > Thanks for the voting and comments.
> > > >
> > > > @Stephan
> > > > - The '-XX:MaxDirectMemorySize' value should not include JVM
> Overhead.
> > > > Thanks for correction.
> > > > - 'taskmanager.memory.framework.heap' it heap memory reserved for
> task
> > > > executor framework, which can not be allocated to task slots. I think
> > > users
> > > > should be able to configure both how many total java heap memory a
> task
> > > > executor should have and how many of the total java heap memory can
> be
> > > > allocated to task slots.
> > > > - Regarding network / shuffle memory, I'm with @Andrey. ATM, this
> > memory
> > > > pool (derived from "taskmanager.network.memory.[min/max/fraction]")
> is
> > > only
> > > > used inside NettyShuffleEnvironment as network buffers. There might
> be
> > > > other network memory usage outside the shuffle component (as
> @Zhijiang
> > > also
> > > > suggested), but that is not accounted by this memory pool. This is
> > > exactly
> > > > why I would suggest to change the name from 'network' to 'shuffle'.
> > > > - I agree that we need very good documentation to explain the memory
> > > pools
> > > > and config options, as well as WebUI to present the memory pool
> sizes.
> > I
> > > > would suggest to address these as follow-ups of all the three
> resource
> > > > management FLIPs, for better integration.
> > > >
> > > > @Andrey
> > > > - Agree with the 'shuffle' naming. See above.
> > > >
> > > > @Till
> > > > - My understanding is that Task Off-heap memory accounts for both
> > direct
> > > > and native memory used by the user code. I'm not sure whether we need
> > > > another configure option to split it. Given that we only decided (in
> > the
> > > > discussion thread) to try it out the way we set
> > '-XX:MaxDirectMemorySize'
> > > > in current design and may switch to other alternatives if it doesn't
> > work
> > > > out well, I would suggest the same for this one. I suggest that we
> > first
> > > > try it without the splitting config option, and we can easily add it
> if
> > > it
> > > > doesn't work well.
> > > > - Agree that it's really important to have good documentation for
> this.
> > > See
> > > > above.
> > > >
> > > > @Zhijiang
> > > > - Thanks for the input. My understanding is that 'shuffle memory' is
> a
> > > > portion of the task executor memory reserved for the shuffle
> component.
> > > The
> > > > way shuffle component use these memory (local buffer pool, netty
> > internal
> > > > memory, etc.) can be different depending on the shuffle
> implementation.
> > > The
> > > > task executor (outside of the shuffle implementation) should only
> know
> > > the
> > > > overall memory usage of the shuffle component but no need to
> understand
> > > > more details inside the shuffle implementation.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Sep 3, 2019 at 10:41 PM zhijiang  > > > .invalid>
> > > > wrote:
> > > >
> > > > > Thanks for proposing this FLIP and also +1 on my side.
> > > > >
> > > > > @Andrey Zagrebin For the point o

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-04 Thread Chesnay Schepler

I'm quite worried that we may end up repeating history.

There were already 2 attempts at contributing a pulsar connector, both 
of which failed because no committer was getting involved, despite the 
contributor opening a dedicated discussion thread about the contribution 
beforehand and getting several +1's from committers.


We should really make sure that if we welcome/approve such a 
contribution it will actually get the attention it deserves.


As such, I'm inclined to recommend maintaining the connector outside of 
Flink. We could link to it from the documentation to give it more exposure.
With the upcoming page for sharing artifacts among the community (what's 
the state of that anyway?), this may be a better option.


On 04/09/2019 10:16, Till Rohrmann wrote:

Hi everyone,

thanks a lot for starting this discussion Yijie. I think the Pulsar
connector would be a very valuable addition since Pulsar becomes more and
more popular and it would further expand Flink's interoperability. Also
from a project perspective it makes sense for me to place the connector in
the downstream project.

My main concern/question is how can the Flink community maintain the
connector? We have seen in the past that connectors are some of the most
actively developed components because they need to be kept in sync with the
external system and with Flink. Given that the Pulsar community is willing
to help with maintaining, improving and evolving the connector, I'm
optimistic that we can achieve this. Hence, +1 for contributing it back to
Flink.

Cheers,
Till



On Wed, Sep 4, 2019 at 2:03 AM Sijie Guo  wrote:


Hi Yun,

Since I was the main driver behind FLINK-9641 and FLINK-9168, let me try to
add more context on this.

FLINK-9641 and FLINK-9168 was created for bringing Pulsar as source and
sink for Flink. The integration was done with Flink 1.6.0. We sent out pull
requests about a year ago and we ended up maintaining those connectors in
Pulsar for Pulsar users to use Flink to process event streams in Pulsar.
(See https://github.com/apache/pulsar/tree/master/pulsar-flink). The Flink
1.6 integration is pretty simple and there is no schema considerations.

In the past year, we have made a lot of changes in Pulsar and brought
Pulsar schema as the first-class citizen in Pulsar. We also integrated with
other computing engines for processing Pulsar event streams with Pulsar
schema.

It led us to rethink how to integrate with Flink in the best way. Then we
reimplement the pulsar-flink connectors from the ground up with schema and
bring table API and catalog API as the first-class citizen in the
integration. With that being said, in the new pulsar-flink implementation,
you can register pulsar as a flink catalog and query / process the event
streams using Flink SQL.

This is an example about how to use Pulsar as a Flink catalog:

https://github.com/streamnative/pulsar-flink/blob/3eeddec5625fc7dddc3f8a3ec69f72e1614ca9c9/README.md#use-pulsar-catalog

Yijie has also written a blog post explaining why we re-implement the flink
connector with Flink 1.9 and what are the changes we made in the new
connector:

https://medium.com/streamnative/use-apache-pulsar-as-streaming-table-with-8-lines-of-code-39033a93947f

We believe Pulsar is not just a simple data sink or source for Flink. It
actually can be a fully integrated streaming data storage for Flink in many
areas (sink, source, schema/catalog and state). The combination of Flink
and Pulsar can create a great streaming warehouse architecture for
streaming-first, unified data processing. Since we are talking to
contribute Pulsar integration to Flink here, we are also dedicated to
maintain, improve and evolve the integration with Flink to help the users
who use both Flink and Pulsar.

Hope this give you a bit more background about the pulsar flink
integration. Let me know what are your thoughts.

Thanks,
Sijie


On Tue, Sep 3, 2019 at 11:54 AM Yun Tang  wrote:


Hi Yijie

I can see that Pulsar becomes more and more popular recently and very

glad

to see more people willing to contribute to Flink ecosystem.

Before any further discussion, would you please give some explanation of
the relationship between this thread to current existing JIRAs of pulsar
source [1] and sink [2] connector? Will the contribution contains part of
those PRs or totally different implementation?

[1] https://issues.apache.org/jira/browse/FLINK-9641
[2] https://issues.apache.org/jira/browse/FLINK-9168

Best
Yun Tang

From: Yijie Shen 
Sent: Tuesday, September 3, 2019 13:57
To: dev@flink.apache.org 
Subject: [DISCUSS] Contribute Pulsar Flink connector back to Flink

Dear Flink Community!

I would like to open the discussion of contributing Pulsar Flink
connector [0] back to Flink.

## A brief introduction to Apache Pulsar

Apache Pulsar[1] is a multi-tenant, high-performance distributed
pub-sub messaging system. Pulsar includes multiple features such as
native support for multiple cluster

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Till Rohrmann
Just to clarify Xintong, you suggest that Task off-heap memory represents
direct and native memory. Since we don't know how the user will allocate
the memory we will add this value to -XX:MaxDirectMemorySize so that the
process won't fail if the user allocates only direct memory and no native
memory. Is that correct?

Cheers,
Till

On Wed, Sep 4, 2019 at 10:18 AM Xintong Song  wrote:

> @Stephan
> Not sure what do you mean by "just having this value". Are you suggesting
> that having "taskmanager.memory.network" refers to "shuffle" and "other
> network memory", or only "shuffle"?
>
> I guess what you mean is only "shuffle"? Because currently
> "taskmanager.network.memory" refers to shuffle buffers only, which is "one
> less config value to break".
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Sep 4, 2019 at 3:42 PM Stephan Ewen  wrote:
>
> > If we later split the network memory into "shuffle" and "other network
> > memory", I think it would make sense to split the option then.
> >
> > In that case "taskmanager.memory.network" would probably refer to the
> total
> > network memory, which would most likely be what most users actually
> > configure.
> > My feeling is that for now just having this value is actually easier, and
> > it is one less config value to break (which is also good).
> >
> > On Wed, Sep 4, 2019 at 9:05 AM Xintong Song 
> wrote:
> >
> > > Thanks for the voting and comments.
> > >
> > > @Stephan
> > > - The '-XX:MaxDirectMemorySize' value should not include JVM Overhead.
> > > Thanks for correction.
> > > - 'taskmanager.memory.framework.heap' it heap memory reserved for task
> > > executor framework, which can not be allocated to task slots. I think
> > users
> > > should be able to configure both how many total java heap memory a task
> > > executor should have and how many of the total java heap memory can be
> > > allocated to task slots.
> > > - Regarding network / shuffle memory, I'm with @Andrey. ATM, this
> memory
> > > pool (derived from "taskmanager.network.memory.[min/max/fraction]") is
> > only
> > > used inside NettyShuffleEnvironment as network buffers. There might be
> > > other network memory usage outside the shuffle component (as @Zhijiang
> > also
> > > suggested), but that is not accounted by this memory pool. This is
> > exactly
> > > why I would suggest to change the name from 'network' to 'shuffle'.
> > > - I agree that we need very good documentation to explain the memory
> > pools
> > > and config options, as well as WebUI to present the memory pool sizes.
> I
> > > would suggest to address these as follow-ups of all the three resource
> > > management FLIPs, for better integration.
> > >
> > > @Andrey
> > > - Agree with the 'shuffle' naming. See above.
> > >
> > > @Till
> > > - My understanding is that Task Off-heap memory accounts for both
> direct
> > > and native memory used by the user code. I'm not sure whether we need
> > > another configure option to split it. Given that we only decided (in
> the
> > > discussion thread) to try it out the way we set
> '-XX:MaxDirectMemorySize'
> > > in current design and may switch to other alternatives if it doesn't
> work
> > > out well, I would suggest the same for this one. I suggest that we
> first
> > > try it without the splitting config option, and we can easily add it if
> > it
> > > doesn't work well.
> > > - Agree that it's really important to have good documentation for this.
> > See
> > > above.
> > >
> > > @Zhijiang
> > > - Thanks for the input. My understanding is that 'shuffle memory' is a
> > > portion of the task executor memory reserved for the shuffle component.
> > The
> > > way shuffle component use these memory (local buffer pool, netty
> internal
> > > memory, etc.) can be different depending on the shuffle implementation.
> > The
> > > task executor (outside of the shuffle implementation) should only know
> > the
> > > overall memory usage of the shuffle component but no need to understand
> > > more details inside the shuffle implementation.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Sep 3, 2019 at 10:41 PM zhijiang  > > .invalid>
> > > wrote:
> > >
> > > > Thanks for proposing this FLIP and also +1 on my side.
> > > >
> > > > @Andrey Zagrebin For the point of "network memory is actually used
> more
> > > > than shuffling", I guess that the component of queryable state is
> also
> > > > using network/netty stack atm, which is outside of shuffling.
> > > > In addition, if we only consider the shuffle memory provided by
> shuffle
> > > > service interface, we should not only consider the memory used by
> local
> > > > buffer pool, but also consider the netty internal memory
> > > > usages as the overhead, especially we have not the zero-copy
> > improvement
> > > > on dowstream read side. This issue might be out of the vote scope,
> just
> > > > think of we have this issue in [1]. :)
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-12110
> > > >
> >

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Andrey Zagrebin
@Zhijiang @Stephan
I agree with @Xintong for the scope of the shuffle memory.
but as @Zhijinag pointed out it is not easy to estimate real netty shuffle
memory consumption due to the overhead.
Everything that is pretty much O(1) comparing to the shuffle buffer size
can be accommodated in the general framework/task memory.
Depending on how big netty shuffle overhead (not buffers), we can either
make netty shuffle include it into shuffle memory or treat it as a part of
the general framework/task memory.
I do not see that we use the current network memory option in KV state
server atm and size-wise it can probably belong to the general
framework/task memory.

@Xintong @Till
In general, user might want to use both direct and native memory in Tasks.
Then naturally Task Off-heap memory is sum of them.
I guess the problem is that if user needs non-zero native memory then it is
expected that direct memory limit is less than the total Task Off-heap
memory.
If we just say that -XX:MaxDirectMemorySize includes all Task Off-heap
memory, the provided limit for the direct memory is technically bigger than
the real need.
I think it will be cleaner to separate task direct and native memory but
then I agree with Till that the default values/documentation/examples
should be very clear for users.

Thanks,
Andrey

On Wed, Sep 4, 2019 at 10:18 AM Xintong Song  wrote:

> @Stephan
> Not sure what do you mean by "just having this value". Are you suggesting
> that having "taskmanager.memory.network" refers to "shuffle" and "other
> network memory", or only "shuffle"?
>
> I guess what you mean is only "shuffle"? Because currently
> "taskmanager.network.memory" refers to shuffle buffers only, which is "one
> less config value to break".
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Sep 4, 2019 at 3:42 PM Stephan Ewen  wrote:
>
> > If we later split the network memory into "shuffle" and "other network
> > memory", I think it would make sense to split the option then.
> >
> > In that case "taskmanager.memory.network" would probably refer to the
> total
> > network memory, which would most likely be what most users actually
> > configure.
> > My feeling is that for now just having this value is actually easier, and
> > it is one less config value to break (which is also good).
> >
> > On Wed, Sep 4, 2019 at 9:05 AM Xintong Song 
> wrote:
> >
> > > Thanks for the voting and comments.
> > >
> > > @Stephan
> > > - The '-XX:MaxDirectMemorySize' value should not include JVM Overhead.
> > > Thanks for correction.
> > > - 'taskmanager.memory.framework.heap' it heap memory reserved for task
> > > executor framework, which can not be allocated to task slots. I think
> > users
> > > should be able to configure both how many total java heap memory a task
> > > executor should have and how many of the total java heap memory can be
> > > allocated to task slots.
> > > - Regarding network / shuffle memory, I'm with @Andrey. ATM, this
> memory
> > > pool (derived from "taskmanager.network.memory.[min/max/fraction]") is
> > only
> > > used inside NettyShuffleEnvironment as network buffers. There might be
> > > other network memory usage outside the shuffle component (as @Zhijiang
> > also
> > > suggested), but that is not accounted by this memory pool. This is
> > exactly
> > > why I would suggest to change the name from 'network' to 'shuffle'.
> > > - I agree that we need very good documentation to explain the memory
> > pools
> > > and config options, as well as WebUI to present the memory pool sizes.
> I
> > > would suggest to address these as follow-ups of all the three resource
> > > management FLIPs, for better integration.
> > >
> > > @Andrey
> > > - Agree with the 'shuffle' naming. See above.
> > >
> > > @Till
> > > - My understanding is that Task Off-heap memory accounts for both
> direct
> > > and native memory used by the user code. I'm not sure whether we need
> > > another configure option to split it. Given that we only decided (in
> the
> > > discussion thread) to try it out the way we set
> '-XX:MaxDirectMemorySize'
> > > in current design and may switch to other alternatives if it doesn't
> work
> > > out well, I would suggest the same for this one. I suggest that we
> first
> > > try it without the splitting config option, and we can easily add it if
> > it
> > > doesn't work well.
> > > - Agree that it's really important to have good documentation for this.
> > See
> > > above.
> > >
> > > @Zhijiang
> > > - Thanks for the input. My understanding is that 'shuffle memory' is a
> > > portion of the task executor memory reserved for the shuffle component.
> > The
> > > way shuffle component use these memory (local buffer pool, netty
> internal
> > > memory, etc.) can be different depending on the shuffle implementation.
> > The
> > > task executor (outside of the shuffle implementation) should only know
> > the
> > > overall memory usage of the shuffle component but no need to understand
> > > more details inside the shuffle implement

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Xintong Song
@Stephan
Not sure what do you mean by "just having this value". Are you suggesting
that having "taskmanager.memory.network" refers to "shuffle" and "other
network memory", or only "shuffle"?

I guess what you mean is only "shuffle"? Because currently
"taskmanager.network.memory" refers to shuffle buffers only, which is "one
less config value to break".

Thank you~

Xintong Song



On Wed, Sep 4, 2019 at 3:42 PM Stephan Ewen  wrote:

> If we later split the network memory into "shuffle" and "other network
> memory", I think it would make sense to split the option then.
>
> In that case "taskmanager.memory.network" would probably refer to the total
> network memory, which would most likely be what most users actually
> configure.
> My feeling is that for now just having this value is actually easier, and
> it is one less config value to break (which is also good).
>
> On Wed, Sep 4, 2019 at 9:05 AM Xintong Song  wrote:
>
> > Thanks for the voting and comments.
> >
> > @Stephan
> > - The '-XX:MaxDirectMemorySize' value should not include JVM Overhead.
> > Thanks for correction.
> > - 'taskmanager.memory.framework.heap' it heap memory reserved for task
> > executor framework, which can not be allocated to task slots. I think
> users
> > should be able to configure both how many total java heap memory a task
> > executor should have and how many of the total java heap memory can be
> > allocated to task slots.
> > - Regarding network / shuffle memory, I'm with @Andrey. ATM, this memory
> > pool (derived from "taskmanager.network.memory.[min/max/fraction]") is
> only
> > used inside NettyShuffleEnvironment as network buffers. There might be
> > other network memory usage outside the shuffle component (as @Zhijiang
> also
> > suggested), but that is not accounted by this memory pool. This is
> exactly
> > why I would suggest to change the name from 'network' to 'shuffle'.
> > - I agree that we need very good documentation to explain the memory
> pools
> > and config options, as well as WebUI to present the memory pool sizes. I
> > would suggest to address these as follow-ups of all the three resource
> > management FLIPs, for better integration.
> >
> > @Andrey
> > - Agree with the 'shuffle' naming. See above.
> >
> > @Till
> > - My understanding is that Task Off-heap memory accounts for both direct
> > and native memory used by the user code. I'm not sure whether we need
> > another configure option to split it. Given that we only decided (in the
> > discussion thread) to try it out the way we set '-XX:MaxDirectMemorySize'
> > in current design and may switch to other alternatives if it doesn't work
> > out well, I would suggest the same for this one. I suggest that we first
> > try it without the splitting config option, and we can easily add it if
> it
> > doesn't work well.
> > - Agree that it's really important to have good documentation for this.
> See
> > above.
> >
> > @Zhijiang
> > - Thanks for the input. My understanding is that 'shuffle memory' is a
> > portion of the task executor memory reserved for the shuffle component.
> The
> > way shuffle component use these memory (local buffer pool, netty internal
> > memory, etc.) can be different depending on the shuffle implementation.
> The
> > task executor (outside of the shuffle implementation) should only know
> the
> > overall memory usage of the shuffle component but no need to understand
> > more details inside the shuffle implementation.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Sep 3, 2019 at 10:41 PM zhijiang  > .invalid>
> > wrote:
> >
> > > Thanks for proposing this FLIP and also +1 on my side.
> > >
> > > @Andrey Zagrebin For the point of "network memory is actually used more
> > > than shuffling", I guess that the component of queryable state is also
> > > using network/netty stack atm, which is outside of shuffling.
> > > In addition, if we only consider the shuffle memory provided by shuffle
> > > service interface, we should not only consider the memory used by local
> > > buffer pool, but also consider the netty internal memory
> > > usages as the overhead, especially we have not the zero-copy
> improvement
> > > on dowstream read side. This issue might be out of the vote scope, just
> > > think of we have this issue in [1]. :)
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-12110
> > >
> > > Best,
> > > Zhijiang
> > > --
> > > From:Till Rohrmann 
> > > Send Time:2019年9月3日(星期二) 15:07
> > > To:dev 
> > > Subject:Re: [VOTE] FLIP-49: Unified Memory Configuration for
> > TaskExecutors
> > >
> > > Thanks for creating this FLIP and starting the vote Xintong.
> > >
> > > +1 for the proposal from my side.
> > >
> > > I agree with Stephan that we might wanna revisit some of the
> > configuration
> > > names.
> > >
> > > If I understood it correctly, then Task Off-heap memory represents the
> > > direct memory used by the user code, right? How would us

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-04 Thread Till Rohrmann
Hi everyone,

thanks a lot for starting this discussion Yijie. I think the Pulsar
connector would be a very valuable addition since Pulsar becomes more and
more popular and it would further expand Flink's interoperability. Also
from a project perspective it makes sense for me to place the connector in
the downstream project.

My main concern/question is how can the Flink community maintain the
connector? We have seen in the past that connectors are some of the most
actively developed components because they need to be kept in sync with the
external system and with Flink. Given that the Pulsar community is willing
to help with maintaining, improving and evolving the connector, I'm
optimistic that we can achieve this. Hence, +1 for contributing it back to
Flink.

Cheers,
Till



On Wed, Sep 4, 2019 at 2:03 AM Sijie Guo  wrote:

> Hi Yun,
>
> Since I was the main driver behind FLINK-9641 and FLINK-9168, let me try to
> add more context on this.
>
> FLINK-9641 and FLINK-9168 was created for bringing Pulsar as source and
> sink for Flink. The integration was done with Flink 1.6.0. We sent out pull
> requests about a year ago and we ended up maintaining those connectors in
> Pulsar for Pulsar users to use Flink to process event streams in Pulsar.
> (See https://github.com/apache/pulsar/tree/master/pulsar-flink). The Flink
> 1.6 integration is pretty simple and there is no schema considerations.
>
> In the past year, we have made a lot of changes in Pulsar and brought
> Pulsar schema as the first-class citizen in Pulsar. We also integrated with
> other computing engines for processing Pulsar event streams with Pulsar
> schema.
>
> It led us to rethink how to integrate with Flink in the best way. Then we
> reimplement the pulsar-flink connectors from the ground up with schema and
> bring table API and catalog API as the first-class citizen in the
> integration. With that being said, in the new pulsar-flink implementation,
> you can register pulsar as a flink catalog and query / process the event
> streams using Flink SQL.
>
> This is an example about how to use Pulsar as a Flink catalog:
>
> https://github.com/streamnative/pulsar-flink/blob/3eeddec5625fc7dddc3f8a3ec69f72e1614ca9c9/README.md#use-pulsar-catalog
>
> Yijie has also written a blog post explaining why we re-implement the flink
> connector with Flink 1.9 and what are the changes we made in the new
> connector:
>
> https://medium.com/streamnative/use-apache-pulsar-as-streaming-table-with-8-lines-of-code-39033a93947f
>
> We believe Pulsar is not just a simple data sink or source for Flink. It
> actually can be a fully integrated streaming data storage for Flink in many
> areas (sink, source, schema/catalog and state). The combination of Flink
> and Pulsar can create a great streaming warehouse architecture for
> streaming-first, unified data processing. Since we are talking to
> contribute Pulsar integration to Flink here, we are also dedicated to
> maintain, improve and evolve the integration with Flink to help the users
> who use both Flink and Pulsar.
>
> Hope this give you a bit more background about the pulsar flink
> integration. Let me know what are your thoughts.
>
> Thanks,
> Sijie
>
>
> On Tue, Sep 3, 2019 at 11:54 AM Yun Tang  wrote:
>
> > Hi Yijie
> >
> > I can see that Pulsar becomes more and more popular recently and very
> glad
> > to see more people willing to contribute to Flink ecosystem.
> >
> > Before any further discussion, would you please give some explanation of
> > the relationship between this thread to current existing JIRAs of pulsar
> > source [1] and sink [2] connector? Will the contribution contains part of
> > those PRs or totally different implementation?
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-9641
> > [2] https://issues.apache.org/jira/browse/FLINK-9168
> >
> > Best
> > Yun Tang
> > 
> > From: Yijie Shen 
> > Sent: Tuesday, September 3, 2019 13:57
> > To: dev@flink.apache.org 
> > Subject: [DISCUSS] Contribute Pulsar Flink connector back to Flink
> >
> > Dear Flink Community!
> >
> > I would like to open the discussion of contributing Pulsar Flink
> > connector [0] back to Flink.
> >
> > ## A brief introduction to Apache Pulsar
> >
> > Apache Pulsar[1] is a multi-tenant, high-performance distributed
> > pub-sub messaging system. Pulsar includes multiple features such as
> > native support for multiple clusters in a Pulsar instance, with
> > seamless geo-replication of messages across clusters, very low publish
> > and end-to-end latency, seamless scalability to over a million topics,
> > and guaranteed message delivery with persistent message storage
> > provided by Apache BookKeeper. Nowadays, Pulsar has been adopted by
> > more and more companies[2].
> >
> > ## The status of Pulsar Flink connector
> >
> > The Pulsar Flink connector we are planning to contribute is built upon
> > Flink 1.9.0 and Pulsar 2.4.0. The main features are:
> > - Pulsar as a streaming source with ex

Re: [DISCUSS] FLIP-53: Fine Grained Resource Management

2019-09-04 Thread Xintong Song
@all

The FLIP document [1] has been updated.

Thank you~

Xintong Song


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management

On Tue, Sep 3, 2019 at 7:20 PM Zhu Zhu  wrote:

> Thanks Xintong for the explanation.
>
> For question #1, I think it's good as long as DataSet job behaviors remains
> the same.
>
> For question #2, agreed that the resource difference is small enough(at
> most 1 edge diff) in current supported point-wise execution edge connection
> patterns.
>
> Thanks,
> Zhu Zhu
>
> Xintong Song  于2019年9月3日周二 下午6:58写道:
>
> >  Thanks for the comments, Zhu & Kurt.
> >
> > Andrey and I also had some discussions offline, and I would like to first
> > post a summary of our discussion:
> >
> >1. The motivation of the fraction based approach is to unify resource
> >management for both operators with specified and unknown resource
> >requirements.
> >2. The fraction based approach proposed in this FLIP should only
> affect
> >streaming jobs (both bounded and unbounded). For DataSet jobs, there
> are
> >already some fraction based approach (in TaskConfig and
> ChainedDriver),
> > and
> >we do not make any change to the existing approach.
> >3. The scope of this FLIP does not include discussion of how to set
> >ResourceSpec for operators.
> >   1. For blink jobs, the optimizer can set operator resources for the
> >   users, according to their configurations (default: unknown)
> >   2. For DataStream jobs, there are no method / interface to set
> >   operator resources at the moment (1.10). We can have in the future.
> >   3. For DataSet jobs, there are existing user interfaces to set
> >   operator resources.
> >4. The FLIP should explain more about how ResourceSpecs works
> >   1. PhysicalTransformations (deployed with operators into the
> >   StreamTasks) get ResourceSpec: unknown by default or known (e.g.
> > from the
> >   Blink planner)
> >   2. While generating stream graph, calculate fractions and set to
> >   StreamConfig
> >   3. While scheduling, convert ResourceSpec to ResourceProfile
> >   (ResourceSpec + network memory), and deploy to slots / TMs matching
> > the
> >   resources
> >   4. While starting Task in TM, each operator gets fraction converted
> >   back to the original absolute value requested by user or fair
> > unknown share
> >   of the slot
> >   5. We should not set `allSourcesInSamePipelinedRegion` to `false`
> for
> >DataSet jobs. Behaviors of DataSet jobs should not be changed.
> >6. The FLIP document should differentiate works planed in this FLIP
> and
> >the future follow-ups more clearly, by put the follow-ups in a
> separate
> >section
> >7. Another limitation of the rejected alternative setting fractions at
> >scheduling time is that, the scheduler implementation does not know
> > which
> >tasks will be deployed into the same slot in advance.
> >
> > Andrey, Please bring it up if there is anything I missed.
> >
> > Zhu, regarding your comments:
> >
> >1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for
> >DataSet jobs (point 5 in the discussion summary above), then there
> >shouldn't be any regression right?
> >2. I think it makes sense to set the max possible network memory for
> the
> >JobVertex. When you say parallel instances of the same JobVertex may
> > have
> >need different network memory, I guess you mean the rescale scenarios
> > where
> >parallelisms of upstream / downstream vertex cannot be exactly divided
> > by
> >parallelism of downstream / upstream vertex? I would say it's
> > acceptable to
> >have slight difference between actually needed and allocated network
> > memory.
> >3. Yes, by numOpsUseOnHeapManagedMemory I mean
> >numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the doc.
> >4. Yes, it should be StreamingJobGraphGenerator. Thanks for the
> >correction.
> >
> >
> > Kurt, regarding your comments:
> >
> >1. I think we don't have network memory in ResourceSpec, which is the
> >user facing API. We only have network memory in ResourceProfile, which
> > is
> >used internally for scheduling. The reason we do not expose network
> > memory
> >to the user is that, currently how many network buffers each task
> needs
> > is
> >decided by the topology of execution graph (how many input / output
> >channels it has).
> >2. In the section "Operator Resource Requirements": "For the first
> >version, we do not support mixing operators with specified / unknown
> >resource requirements in the same job. Either all or none of the
> > operators
> >of the same job should specify their resource requirements.
> >StreamGraphGenerator should check this and throw an error when mixing
> of
> >specified / unknown resource requirements is detected, during the

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Stephan Ewen
If we later split the network memory into "shuffle" and "other network
memory", I think it would make sense to split the option then.

In that case "taskmanager.memory.network" would probably refer to the total
network memory, which would most likely be what most users actually
configure.
My feeling is that for now just having this value is actually easier, and
it is one less config value to break (which is also good).

On Wed, Sep 4, 2019 at 9:05 AM Xintong Song  wrote:

> Thanks for the voting and comments.
>
> @Stephan
> - The '-XX:MaxDirectMemorySize' value should not include JVM Overhead.
> Thanks for correction.
> - 'taskmanager.memory.framework.heap' it heap memory reserved for task
> executor framework, which can not be allocated to task slots. I think users
> should be able to configure both how many total java heap memory a task
> executor should have and how many of the total java heap memory can be
> allocated to task slots.
> - Regarding network / shuffle memory, I'm with @Andrey. ATM, this memory
> pool (derived from "taskmanager.network.memory.[min/max/fraction]") is only
> used inside NettyShuffleEnvironment as network buffers. There might be
> other network memory usage outside the shuffle component (as @Zhijiang also
> suggested), but that is not accounted by this memory pool. This is exactly
> why I would suggest to change the name from 'network' to 'shuffle'.
> - I agree that we need very good documentation to explain the memory pools
> and config options, as well as WebUI to present the memory pool sizes. I
> would suggest to address these as follow-ups of all the three resource
> management FLIPs, for better integration.
>
> @Andrey
> - Agree with the 'shuffle' naming. See above.
>
> @Till
> - My understanding is that Task Off-heap memory accounts for both direct
> and native memory used by the user code. I'm not sure whether we need
> another configure option to split it. Given that we only decided (in the
> discussion thread) to try it out the way we set '-XX:MaxDirectMemorySize'
> in current design and may switch to other alternatives if it doesn't work
> out well, I would suggest the same for this one. I suggest that we first
> try it without the splitting config option, and we can easily add it if it
> doesn't work well.
> - Agree that it's really important to have good documentation for this. See
> above.
>
> @Zhijiang
> - Thanks for the input. My understanding is that 'shuffle memory' is a
> portion of the task executor memory reserved for the shuffle component. The
> way shuffle component use these memory (local buffer pool, netty internal
> memory, etc.) can be different depending on the shuffle implementation. The
> task executor (outside of the shuffle implementation) should only know the
> overall memory usage of the shuffle component but no need to understand
> more details inside the shuffle implementation.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 3, 2019 at 10:41 PM zhijiang  .invalid>
> wrote:
>
> > Thanks for proposing this FLIP and also +1 on my side.
> >
> > @Andrey Zagrebin For the point of "network memory is actually used more
> > than shuffling", I guess that the component of queryable state is also
> > using network/netty stack atm, which is outside of shuffling.
> > In addition, if we only consider the shuffle memory provided by shuffle
> > service interface, we should not only consider the memory used by local
> > buffer pool, but also consider the netty internal memory
> > usages as the overhead, especially we have not the zero-copy improvement
> > on dowstream read side. This issue might be out of the vote scope, just
> > think of we have this issue in [1]. :)
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-12110
> >
> > Best,
> > Zhijiang
> > --
> > From:Till Rohrmann 
> > Send Time:2019年9月3日(星期二) 15:07
> > To:dev 
> > Subject:Re: [VOTE] FLIP-49: Unified Memory Configuration for
> TaskExecutors
> >
> > Thanks for creating this FLIP and starting the vote Xintong.
> >
> > +1 for the proposal from my side.
> >
> > I agree with Stephan that we might wanna revisit some of the
> configuration
> > names.
> >
> > If I understood it correctly, then Task Off-heap memory represents the
> > direct memory used by the user code, right? How would users configure
> > native memory requirements for the user code? If it is part of Task Off
> > heap memory, then we need to split it to set -XX:MaxDirectMemorySize
> > correctly or to introduce another configuration option.
> >
> > Given all these configuration options, I can see that users will get
> > confused quite easily. Therefore, I would like to emphasise that we need
> a
> > very good documentation about how to properly configure Flink processes
> and
> > which knobs to turn in which cases.
> >
> > Cheers,
> > Till
> >
> > On Tue, Sep 3, 2019 at 2:34 PM Andrey Zagrebin 
> > wrote:
> >
> > > Thanks for starting the vote Xintong
> > >
> > > 

[jira] [Created] (FLINK-13953) Facilitate enabling new Scheduler in MiniCluster Tests

2019-09-04 Thread Gary Yao (Jira)
Gary Yao created FLINK-13953:


 Summary: Facilitate enabling new Scheduler in MiniCluster Tests
 Key: FLINK-13953
 URL: https://issues.apache.org/jira/browse/FLINK-13953
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Gary Yao
Assignee: Gary Yao


Currently, tests using the {{MiniCluster}} use the legacy scheduler by default. 
Once the new scheduler is implemented, we should run tests with the new 
scheduler enabled. However, it is not expected that all tests will pass 
immediately. Therefore, it should be possible to enable the new scheduler for a 
subset of tests. 

*Acceptance Criteria*
* Tests using {{MiniCluster}} are run on a per-commit basis (on Travis) against 
new scheduler and also legacy scheduler



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Reducing build times

2019-09-04 Thread Chesnay Schepler
Will using more powerful for the project make it more difficult to 
ensure that contributor builds are still running in a reasonable time?


As an example of this happening on Travis, contributors currently cannot 
run all e2e tests since they timeout, but on apache we have a larger 
timeout.


On 03/09/2019 18:57, Robert Metzger wrote:

Hi all,

I wanted to give a short update on this:
- Arvid, Aljoscha and I have started working on a Gradle PoC, currently
working on making all modules compile and test with Gradle. We've also
identified some problematic areas (shading being the most obvious one)
which we will analyse as part of the PoC.
The goal is to see how much Gradle helps to parallelise our build, and to
avoid duplicate work (incremental builds).

- I am working on setting up a Flink testing infrastructure based on Azure
Pipelines, using more powerful hardware. Alibaba kindly provided me with
two 32 core machines (temporarily), and another company reached out to
privately, looking into options for cheap, fast machines :)
If nobody in the community disagrees, I am going to set up Azure Pipelines
with our apache/flink GitHub as a build infrastructure that exists next to
Flinkbot and flink-ci. I would like to make sure that Azure Pipelines is
equally or even more reliable than Travis, and I want to see what the
required maintenance work is.
On top of that, Azure Pipelines is a very feature-rich tool with a lot of
nice options for us to improve the build experience (statistics about tests
(flaky tests etc.), nice docker support, plenty of free build resources for
open source projects, ...)

Best,
Robert





On Mon, Aug 19, 2019 at 5:12 PM Robert Metzger  wrote:


Hi all,

I have summarized all arguments mentioned so far + some additional
research into a Wiki page here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125309279

I'm happy to hear further comments on my summary! I'm pretty sure we can
find more pro's and con's for the different options.

My opinion after looking at the options:

- Flink relies on an outdated build tool (Maven), while a good
alternative is well-established (gradle), and will likely provide a much
better CI and local build experience through incremental build and cached
intermediates.
Scripting around Maven, or splitting modules / test execution /
repositories won't solve this problem. We should rather spend the effort in
migrating to a modern build tool which will provide us benefits in the long
run.
- Flink relies on a fairly slow build service (Travis CI), while
simply putting more money onto the problem could cut the build time at
least in half.
We should consider using a build service that provides bigger machines
to solve our build time problem.

My opinion is based on many assumptions (gradle is actually as fast as
promised (haven't used it before), we can build Flink with gradle, we find
sponsors for bigger build machines) that we need to test first through PoCs.

Best,
Robert




On Mon, Aug 19, 2019 at 10:26 AM Aljoscha Krettek 
wrote:


I did a quick test: a normal "mvn clean install -DskipTests
-Drat.skip=true -Dmaven.javadoc.skip=true -Punsafe-mapr-repo” on my machine
takes about 14 minutes. After removing all mentions of maven-shade-plugin
the build time goes down to roughly 11.5 minutes. (Obviously the resulting
Flink won’t work, because some expected stuff is not packaged and most of
the end-to-end tests use the shade plugin to package the jars for testing.

Aljoscha


On 18. Aug 2019, at 19:52, Robert Metzger  wrote:

Hi all,

I wanted to understand the impact of the hardware we are using for

running

our tests. Each travis worker has 2 virtual cores, and 7.5 gb memory

[1].

They are using Google Cloud Compute Engine *n1-standard-2* instances.
Running a full "mvn clean verify" takes *03:32 h* on such a machine

type.

Running the same workload on a 32 virtual cores, 64 gb machine, takes

*1:21

h*.

What is interesting are the per-module build time differences.
Modules which are parallelizing tests well greatly benefit from the
additional cores:
"flink-tests" 36:51 min vs 4:33 min
"flink-runtime" 23:41 min vs 3:47 min
"flink-table-planner" 15:54 min vs 3:13 min

On the other hand, we have modules which are not parallel at all:
"flink-connector-kafka": 16:32 min vs 15:19 min
"flink-connector-kafka-0.11": 9:52 min vs 7:46 min
Also, the checkstyle plugin is not scaling at all.

Chesnay reported some significant speedups by reusing forks.
I don't know how much effort it would be to make the Kafka tests
parallelizable. In total, they currently use 30 minutes on the big

machine

(while 31 CPUs are idling :) )

Let me know what you think about these results. If the community is
generally interested in further investigating into that direction, I

could

look into software to orchestrate this, as well as sponsors for such an
infrastructure.

[1] https://docs.travis-ci.com/user/reference/overview

FLIP-63: Rework table partition support

2019-09-04 Thread JingsongLee

Hi everyone,

We would like to start a discussion thread on "FLIP-63: Rework table
partition support"(Design doc: [1]), where we describe how to partition
 support in flink and how to integrate to hive partition.

This FLIP addresses:
   - Introduce whole story about partition support.
   - Introduce and discuss DDL of partition support.
   - Introduce static and dynamic partition insert.
   - Introduce partition pruning
   - Introduce dynamic partition implementation

Details can be seen in the design document.
Looking forward to your feedbacks. Thank you.

[1] 
https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing

Best,
Jingsong Lee

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-04 Thread Xintong Song
Thanks for the voting and comments.

@Stephan
- The '-XX:MaxDirectMemorySize' value should not include JVM Overhead.
Thanks for correction.
- 'taskmanager.memory.framework.heap' it heap memory reserved for task
executor framework, which can not be allocated to task slots. I think users
should be able to configure both how many total java heap memory a task
executor should have and how many of the total java heap memory can be
allocated to task slots.
- Regarding network / shuffle memory, I'm with @Andrey. ATM, this memory
pool (derived from "taskmanager.network.memory.[min/max/fraction]") is only
used inside NettyShuffleEnvironment as network buffers. There might be
other network memory usage outside the shuffle component (as @Zhijiang also
suggested), but that is not accounted by this memory pool. This is exactly
why I would suggest to change the name from 'network' to 'shuffle'.
- I agree that we need very good documentation to explain the memory pools
and config options, as well as WebUI to present the memory pool sizes. I
would suggest to address these as follow-ups of all the three resource
management FLIPs, for better integration.

@Andrey
- Agree with the 'shuffle' naming. See above.

@Till
- My understanding is that Task Off-heap memory accounts for both direct
and native memory used by the user code. I'm not sure whether we need
another configure option to split it. Given that we only decided (in the
discussion thread) to try it out the way we set '-XX:MaxDirectMemorySize'
in current design and may switch to other alternatives if it doesn't work
out well, I would suggest the same for this one. I suggest that we first
try it without the splitting config option, and we can easily add it if it
doesn't work well.
- Agree that it's really important to have good documentation for this. See
above.

@Zhijiang
- Thanks for the input. My understanding is that 'shuffle memory' is a
portion of the task executor memory reserved for the shuffle component. The
way shuffle component use these memory (local buffer pool, netty internal
memory, etc.) can be different depending on the shuffle implementation. The
task executor (outside of the shuffle implementation) should only know the
overall memory usage of the shuffle component but no need to understand
more details inside the shuffle implementation.

Thank you~

Xintong Song



On Tue, Sep 3, 2019 at 10:41 PM zhijiang 
wrote:

> Thanks for proposing this FLIP and also +1 on my side.
>
> @Andrey Zagrebin For the point of "network memory is actually used more
> than shuffling", I guess that the component of queryable state is also
> using network/netty stack atm, which is outside of shuffling.
> In addition, if we only consider the shuffle memory provided by shuffle
> service interface, we should not only consider the memory used by local
> buffer pool, but also consider the netty internal memory
> usages as the overhead, especially we have not the zero-copy improvement
> on dowstream read side. This issue might be out of the vote scope, just
> think of we have this issue in [1]. :)
>
> [1] https://issues.apache.org/jira/browse/FLINK-12110
>
> Best,
> Zhijiang
> --
> From:Till Rohrmann 
> Send Time:2019年9月3日(星期二) 15:07
> To:dev 
> Subject:Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors
>
> Thanks for creating this FLIP and starting the vote Xintong.
>
> +1 for the proposal from my side.
>
> I agree with Stephan that we might wanna revisit some of the configuration
> names.
>
> If I understood it correctly, then Task Off-heap memory represents the
> direct memory used by the user code, right? How would users configure
> native memory requirements for the user code? If it is part of Task Off
> heap memory, then we need to split it to set -XX:MaxDirectMemorySize
> correctly or to introduce another configuration option.
>
> Given all these configuration options, I can see that users will get
> confused quite easily. Therefore, I would like to emphasise that we need a
> very good documentation about how to properly configure Flink processes and
> which knobs to turn in which cases.
>
> Cheers,
> Till
>
> On Tue, Sep 3, 2019 at 2:34 PM Andrey Zagrebin 
> wrote:
>
> > Thanks for starting the vote Xintong
> >
> > Also +1 for the proposed FLIP-49.
> >
> > @Stephan regarding namings: network vs shuffle.
> > My understanding so far was that the network memory is what we basically
> > give to Shuffle implementations and default netty implementation uses it
> in
> > particular mostly for networking.
> > Are the network pools used for something else outside of the shuffling
> > scope?
> >
> > best,
> > Andrey
> >
> > On Tue, Sep 3, 2019 at 1:01 PM Stephan Ewen  wrote:
> >
> > > +1 to the proposal in general
> > >
> > > A few things seems to be a bit put of sync with the latest discussions
> > > though.
> > >
> > > The section about JVM Parameters states that the
> > > -XX:MaxDirectMemorySize value