Re: [DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL

2018-12-10 Thread jincheng sun
Hi Xuefu,

Thanks for your feedback, and mention the compatibility issues.
You are right the change will result version incompatibility. And we my
plan it's will be released in the version of 1.8.x.

To be frank, we have considered the compatibility approach, which is to
retain the current TableEnvironment, and then create a new one, such as
"GeneralTableEnvironment" for unified abstraction, and then Deprecated the
TableEnvironment. But we feel that the code is not clean enough, and the
long-term goal is that we need to make StreamTableEnvironment and
BatchTableEnvironment transparent to the user, so I tend to release this
change in 1.8.x, keeping the status quo in 1.7.x. What do you think? Any
feedback is welcome!

Thanks,
Jincheng


Zhang, Xuefu  于2018年12月11日周二 下午1:13写道:

> Hi Jincheng,
>
> Thanks for bringing this up. It seems making good sense to me. However,
> one concern I have is about backward compatibility. Could you clarify
> whether existing user program will break with the proposed changes?
>
> The answer to the question would largely determine when this can be
> introduced.
>
> Thanks,
> Xuefu
>
>
> --
> Sender:jincheng sun 
> Sent at:2018 Dec 10 (Mon) 18:14
> Recipient:dev 
> Subject:[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL
>
> Hi All,
>
> According to the feedback from users, the design of TableEnvironment is
> very inconvenient for users, and often mistakenly imported by IDE,
> especially for Java users, such as:
>
> ExecutionEnvironment env = ...BatchTableEnvironment tEnv =
> TableEnvironment.getTableEnvironment(env);
>
> The user does not know which BatchTableEnvironment should be imported,
> because there are three implementations of BatchTableEnvironment, shown as
> below:
>
> 1. org.apache.flink.table.api.BatchTableEnvironment 2.
> org.apache.flink.table.api.java.BatchTableEnvironment 3.
> org.apache.flink.table.api.scala.BatchTableEnvironment
> [image.png]
>
>
> This brings unnecessary inconveniences to the flink user. To solve this
> problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself
> discussed offline a bit and propose to change the inheritance diagram of
> TableEnvironment is shown as follows:
>  1. AbstractTaleEnvironment - rename current TableEnvironment to
> AbstractTableEnvironment, The functionality implemented by Abstract
> TableEnvironment is stream and batch shared.2. TableEnvironment - Create a
> new TableEnvironment(abstract), and defined all methods in
> (java/scala)StreamTableEnvironment and (java/scala)BatchTableEnvironment.
> In the implementation of BatchTableEnviroment and StreamTableEnviroment,
> the unsupported operations will be reported as an error.
> [image.png]
> Then the usage as follows:
>
> ExecutionEnvironment env = …TableEnvironment tEnv =
> TableEnvironment.getTableEnvironment(env)
> For detailed proposals please refer to the Google doc:
> https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing
>
> Any mail feedback and Google doc comment are welcome.
>
> Thanks,
> Jincheng
>
>


Re: [DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL

2018-12-10 Thread Timo Walther

Hi Jincheng,

thanks for the proposal. I totally agree with the problem of having 3 
StreamTableEnvironments and 3 BatchTableEnvironments. We also identified 
this problem when doing Flink trainings and introductions to the Table & 
SQL API.


Actually, @Dawid and I were already discussing to remove this 
shortcoming while working on FLINK-11067 [1]. The porting allows to fix 
the class hierarchy because some visibilities of members change as well 
from Scala to Java. This would not break backwards compatibility as the 
base classes should not be used by users anyway.


However, I don't like the design of putting all methods of Batch and 
Stream environments into the base class and throw exceptions if not 
supported by base classes. This sounds not like a nice object oriented 
design and confuses users.


I added some comments to the document. I think we can improve the 
current situation without breaking backwards compatibility. Methods that 
interact with Scala and Java API such as toDataSet/toDataStream should 
not be moved to an abstract class as they would otherwise pull in Scala 
dependencies transitively or do not incoperate with the type extraction 
logic of the target API.


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-11067


Am 11.12.18 um 06:12 schrieb Zhang, Xuefu:

Hi Jincheng,

Thanks for bringing this up. It seems making good sense to me. However, one 
concern I have is about backward compatibility. Could you clarify whether 
existing user program will break with the proposed changes?

The answer to the question would largely determine when this can be introduced.

Thanks,
Xuefu


--
Sender:jincheng sun 
Sent at:2018 Dec 10 (Mon) 18:14
Recipient:dev 
Subject:[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL

Hi All,

According to the feedback from users, the design of TableEnvironment is very 
inconvenient for users, and often mistakenly imported by IDE, especially for 
Java users, such as:

ExecutionEnvironment env = ...BatchTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env);

The user does not know which BatchTableEnvironment should be imported, because 
there are three implementations of BatchTableEnvironment, shown as below:

1. org.apache.flink.table.api.BatchTableEnvironment 2. 
org.apache.flink.table.api.java.BatchTableEnvironment 3. 
org.apache.flink.table.api.scala.BatchTableEnvironment
[image.png]


This brings unnecessary inconveniences to the flink user. To solve this 
problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself discussed 
offline a bit and propose to change the inheritance diagram of TableEnvironment 
is shown as follows:
  1. AbstractTaleEnvironment - rename current TableEnvironment to 
AbstractTableEnvironment, The functionality implemented by Abstract 
TableEnvironment is stream and batch shared.2. TableEnvironment - Create a new 
TableEnvironment(abstract), and defined all methods in 
(java/scala)StreamTableEnvironment and (java/scala)BatchTableEnvironment. In 
the implementation of BatchTableEnviroment and StreamTableEnviroment, the 
unsupported operations will be reported as an error.
[image.png]
Then the usage as follows:

ExecutionEnvironment env = …TableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env)
For detailed proposals please refer to the Google doc: 
https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing

Any mail feedback and Google doc comment are welcome.

Thanks,
Jincheng





[jira] [Created] (FLINK-11126) Filter out AMRMToken in the TaskManager credentials

2018-12-10 Thread Paul Lin (JIRA)
Paul Lin created FLINK-11126:


 Summary: Filter out AMRMToken in the TaskManager credentials
 Key: FLINK-11126
 URL: https://issues.apache.org/jira/browse/FLINK-11126
 Project: Flink
  Issue Type: Improvement
  Components: Security, YARN
Affects Versions: 1.7.0, 1.6.2
Reporter: Paul Lin
Assignee: Paul Lin


Currently, Flink JobManager propagates its storage tokens to TaskManager to 
meet the requirement of YARN log aggregation (see FLINK-6376). But in this way 
the AMRMToken is also included in the TaskManager credentials, which could be 
potentially insecure. We should filter out AMRMToken before setting the tokens 
to TaskManager's container launch context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL

2018-12-10 Thread Zhang, Xuefu
Hi Jincheng,

Thanks for bringing this up. It seems making good sense to me. However, one 
concern I have is about backward compatibility. Could you clarify whether 
existing user program will break with the proposed changes?

The answer to the question would largely determine when this can be introduced.

Thanks,
Xuefu


--
Sender:jincheng sun 
Sent at:2018 Dec 10 (Mon) 18:14
Recipient:dev 
Subject:[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL

Hi All,

According to the feedback from users, the design of TableEnvironment is very 
inconvenient for users, and often mistakenly imported by IDE, especially for 
Java users, such as:

ExecutionEnvironment env = ...BatchTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env);

The user does not know which BatchTableEnvironment should be imported, because 
there are three implementations of BatchTableEnvironment, shown as below:

1. org.apache.flink.table.api.BatchTableEnvironment 2. 
org.apache.flink.table.api.java.BatchTableEnvironment 3. 
org.apache.flink.table.api.scala.BatchTableEnvironment
[image.png]


This brings unnecessary inconveniences to the flink user. To solve this 
problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself discussed 
offline a bit and propose to change the inheritance diagram of TableEnvironment 
is shown as follows:
 1. AbstractTaleEnvironment - rename current TableEnvironment to 
AbstractTableEnvironment, The functionality implemented by Abstract 
TableEnvironment is stream and batch shared.2. TableEnvironment - Create a new 
TableEnvironment(abstract), and defined all methods in 
(java/scala)StreamTableEnvironment and (java/scala)BatchTableEnvironment. In 
the implementation of BatchTableEnviroment and StreamTableEnviroment, the 
unsupported operations will be reported as an error.
[image.png]
Then the usage as follows:

ExecutionEnvironment env = …TableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env)
For detailed proposals please refer to the Google doc: 
https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing

Any mail feedback and Google doc comment are welcome.

Thanks,
Jincheng



Re: [DISCUSS] Support Interactive Programming in Flink Table API

2018-12-10 Thread Becket Qin
Another potential concern for semantic 3 is that. In the future, we may add
automatic caching to Flink. e.g. cache the intermediate results at the
shuffle boundary. If our semantic is that reference to the original table
means skipping cache, those users may not be able to benefit from the
implicit cache.



On Tue, Dec 11, 2018 at 12:10 PM Becket Qin  wrote:

> Hi Piotrek,
>
> Thanks for the reply. Thought about it again, I might have misunderstood
> your proposal in earlier emails. Returning a CachedTable might not be a bad
> idea.
>
> I was more concerned about the semantic and its intuitiveness when a
> CachedTable is returned. i..e, if cache() returns CachedTable. What are the
> semantic in the following code:
> {
>   val cachedTable = a.cache()
>   val b = cachedTable.select(...)
>   val c = a.select(...)
> }
> What is the difference between b and c? At the first glance, I see two
> options:
>
> Semantic 1. b uses cachedTable as user demanded so. c uses original DAG as
> user demanded so. In this case, the optimizer has no chance to optimize.
> Semantic 2. b uses cachedTable as user demanded so. c leaves the optimizer
> to choose whether the cache or DAG should be used. In this case, user lose
> the option to NOT use cache.
>
> As you can see, neither of the options seem perfect. However, I guess you
> and Till are proposing the third option:
>
> Semantic 3. b leaves the optimizer to choose whether cache or DAG should
> be used. c always use the DAG.
>
> This does address all the concerns. It is just that from intuitiveness
> perspective, I found that asking user to explicitly use a CachedTable while
> the optimizer might choose to ignore is a little weird. That was why I did
> not think about that semantic. But given there is material benefit, I think
> this semantic is acceptable.
>
> 1. If we want to let optimiser make decisions whether to use cache or not,
>> then why do we need “void cache()” method at all? Would It  “increase” the
>> chance of using the cache? That’s sounds strange. What would be the
>> mechanism of deciding whether to use the cache or not? If we want to
>> introduce such kind  automated optimisations of “plan nodes deduplication”
>> I would turn it on globally, not per table, and let the optimiser do all of
>> the work.
>> 2. We do not have statistics at the moment for any use/not use cache
>> decision.
>> 3. Even if we had, I would be veeerryy sceptical whether such cost based
>> optimisations would work properly and I would still insist first on
>> providing explicit caching mechanism (`CachedTable cache()`)
>>
> We are absolutely on the same page here. An explicit cache() method is
> necessary not only because optimizer may not be able to make the right
> decision, but also because of the nature of interactive programming. For
> example, if users write the following code in Scala shell:
>   val b = a.select(...)
>   val c = b.select(...)
>   val d = c.select(...).writeToSink(...)
>   tEnv.execute()
> There is no way optimizer will know whether b or c will be used in later
> code, unless users hint explicitly.
>
> At the same time I’m not sure if you have responded to our objections of
>> `void cache()` being implicit/having side effects, which me, Jark, Fabian,
>> Till and I think also Shaoxuan are supporting.
>
> Is there any other side effects if we use semantic 3 mentioned above?
>
> Thanks,
>
> JIangjie (Becket) Qin
>
>
> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski 
> wrote:
>
>> Hi Becket,
>>
>> Sorry for not responding long time.
>>
>> Regarding case1.
>>
>> There wouldn’t be no “a.unCache()” method, but I would expect only
>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect
>> `cachedTableA2`. Just as in any other database dropping modifying one
>> independent table/materialised view does not affect others.
>>
>> > What I meant is that assuming there is already a cached table, ideally
>> users need
>> > not to specify whether the next query should read from the cache or use
>> the
>> > original DAG. This should be decided by the optimizer.
>>
>> 1. If we want to let optimiser make decisions whether to use cache or
>> not, then why do we need “void cache()” method at all? Would It  “increase”
>> the chance of using the cache? That’s sounds strange. What would be the
>> mechanism of deciding whether to use the cache or not? If we want to
>> introduce such kind  automated optimisations of “plan nodes deduplication”
>> I would turn it on globally, not per table, and let the optimiser do all of
>> the work.
>> 2. We do not have statistics at the moment for any use/not use cache
>> decision.
>> 3. Even if we had, I would be veeerryy sceptical whether such cost based
>> optimisations would work properly and I would still insist first on
>> providing explicit caching mechanism (`CachedTable cache()`)
>> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t
>> contradict future work on automated cost based caching.
>>
>>
>> At the same time 

[jira] [Created] (FLINK-11125) Remove useless import

2018-12-10 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11125:
---

 Summary: Remove useless import 
 Key: FLINK-11125
 URL: https://issues.apache.org/jira/browse/FLINK-11125
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL, Tests
Reporter: Hequn Cheng
Assignee: Hequn Cheng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Interactive Programming in Flink Table API

2018-12-10 Thread Becket Qin
Hi Piotrek,

Thanks for the reply. Thought about it again, I might have misunderstood
your proposal in earlier emails. Returning a CachedTable might not be a bad
idea.

I was more concerned about the semantic and its intuitiveness when a
CachedTable is returned. i..e, if cache() returns CachedTable. What are the
semantic in the following code:
{
  val cachedTable = a.cache()
  val b = cachedTable.select(...)
  val c = a.select(...)
}
What is the difference between b and c? At the first glance, I see two
options:

Semantic 1. b uses cachedTable as user demanded so. c uses original DAG as
user demanded so. In this case, the optimizer has no chance to optimize.
Semantic 2. b uses cachedTable as user demanded so. c leaves the optimizer
to choose whether the cache or DAG should be used. In this case, user lose
the option to NOT use cache.

As you can see, neither of the options seem perfect. However, I guess you
and Till are proposing the third option:

Semantic 3. b leaves the optimizer to choose whether cache or DAG should be
used. c always use the DAG.

This does address all the concerns. It is just that from intuitiveness
perspective, I found that asking user to explicitly use a CachedTable while
the optimizer might choose to ignore is a little weird. That was why I did
not think about that semantic. But given there is material benefit, I think
this semantic is acceptable.

1. If we want to let optimiser make decisions whether to use cache or not,
> then why do we need “void cache()” method at all? Would It  “increase” the
> chance of using the cache? That’s sounds strange. What would be the
> mechanism of deciding whether to use the cache or not? If we want to
> introduce such kind  automated optimisations of “plan nodes deduplication”
> I would turn it on globally, not per table, and let the optimiser do all of
> the work.
> 2. We do not have statistics at the moment for any use/not use cache
> decision.
> 3. Even if we had, I would be veeerryy sceptical whether such cost based
> optimisations would work properly and I would still insist first on
> providing explicit caching mechanism (`CachedTable cache()`)
>
We are absolutely on the same page here. An explicit cache() method is
necessary not only because optimizer may not be able to make the right
decision, but also because of the nature of interactive programming. For
example, if users write the following code in Scala shell:
  val b = a.select(...)
  val c = b.select(...)
  val d = c.select(...).writeToSink(...)
  tEnv.execute()
There is no way optimizer will know whether b or c will be used in later
code, unless users hint explicitly.

At the same time I’m not sure if you have responded to our objections of
> `void cache()` being implicit/having side effects, which me, Jark, Fabian,
> Till and I think also Shaoxuan are supporting.

Is there any other side effects if we use semantic 3 mentioned above?

Thanks,

JIangjie (Becket) Qin


On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski 
wrote:

> Hi Becket,
>
> Sorry for not responding long time.
>
> Regarding case1.
>
> There wouldn’t be no “a.unCache()” method, but I would expect only
> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect
> `cachedTableA2`. Just as in any other database dropping modifying one
> independent table/materialised view does not affect others.
>
> > What I meant is that assuming there is already a cached table, ideally
> users need
> > not to specify whether the next query should read from the cache or use
> the
> > original DAG. This should be decided by the optimizer.
>
> 1. If we want to let optimiser make decisions whether to use cache or not,
> then why do we need “void cache()” method at all? Would It  “increase” the
> chance of using the cache? That’s sounds strange. What would be the
> mechanism of deciding whether to use the cache or not? If we want to
> introduce such kind  automated optimisations of “plan nodes deduplication”
> I would turn it on globally, not per table, and let the optimiser do all of
> the work.
> 2. We do not have statistics at the moment for any use/not use cache
> decision.
> 3. Even if we had, I would be veeerryy sceptical whether such cost based
> optimisations would work properly and I would still insist first on
> providing explicit caching mechanism (`CachedTable cache()`)
> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t contradict
> future work on automated cost based caching.
>
>
> At the same time I’m not sure if you have responded to our objections of
> `void cache()` being implicit/having side effects, which me, Jark, Fabian,
> Till and I think also Shaoxuan are supporting.
>
> Piotrek
>
> > On 5 Dec 2018, at 12:42, Becket Qin  wrote:
> >
> > Hi Till,
> >
> > It is true that after the first job submission, there will be no
> ambiguity
> > in terms of whether a cached table is used or not. That is the same for
> the
> > cache() without returning a CachedTable.
> >
> > Conceptually one could think of 

[jira] [Created] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()

2018-12-10 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11124:
---

 Summary: Add private[flink] to TemporalTableFunction.create()
 Key: FLINK-11124
 URL: https://issues.apache.org/jira/browse/FLINK-11124
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


{{TemporalTableFunction}} is an user-oriented class. I think it would be better 
to add {{private[flink]}} to the {{TemporalTableFunction.create()}} method in 
order to make it invisible to users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread sunjincheng (JIRA)
sunjincheng created FLINK-11123:
---

 Summary: Improve ml quick start doc
 Key: FLINK-11123
 URL: https://issues.apache.org/jira/browse/FLINK-11123
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Machine Learning Library
Affects Versions: 1.7.0
Reporter: sunjincheng
Assignee: sunjincheng
 Fix For: 1.7.1, 1.7.0


The user cannot run the sample through the ml quick launch document because the 
import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Creating last bug fix release for 1.5 branch

2018-12-10 Thread Thomas Weise
Thanks Till and my belated +1 for a final patch release :)

On Mon, Dec 10, 2018 at 5:47 AM Till Rohrmann  wrote:

> Thanks for the feedback! I conclude that the community is in favour of a
> last 1.5.6 release. I'll try to make the arrangements in the next two
> weeks.
>
> Cheers,
> Till
>
> On Mon, Dec 10, 2018 at 2:40 AM jincheng sun 
> wrote:
>
> > +1. There are incompatible improvements between 1.5.x and 1.6/1.7, so
> many
> > 1.5.x users may not be willing to upgrade to 1.6 or 1.7 due to migration
> > costs, so it makes sense to creating last bug fix release for 1.5 branch.
> >
> > Bests,
> > Jincheng
> >
> > Jeff Zhang  于2018年12月10日周一 上午9:24写道:
> >
> > > +1, I think very few people would use 1.6 or 1.7 in their production in
> > > near future, so I expect they would use 1.5 in production for a long
> > > period,it makes sense to provide a stable version for production usage.
> > >
> > > Ufuk Celebi  于2018年12月9日周日 下午6:07写道:
> > >
> > > > +1. This seems reasonable to me. Since the fixes are already in and
> > > > also part of other releases, the release overhead should be
> > > > manageable.
> > > >
> > > > @Vino: I agree with your assessment.
> > > >
> > > > @Qi: As Till mentioned, the official project guideline is to support
> > > > the last two minor releases, e.g. currently 1.7 and 1.6.
> > > >
> > > > Best,
> > > >
> > > > Ufuk
> > > >
> > > > On Sun, Dec 9, 2018 at 3:48 AM qi luo  wrote:
> > > > >
> > > > > Hi Till,
> > > > >
> > > > > Does Flink has an agreement on how long will a major version be
> > > > supported? Some companies may need a long time to upgrade Flink major
> > > > versions in production. If Flink terminates support for a major
> version
> > > too
> > > > quickly, it may be a concern for companies.
> > > > >
> > > > > Best,
> > > > > Qi
> > > > >
> > > > > > On Dec 8, 2018, at 10:57 AM, vino yang 
> > > wrote:
> > > > > >
> > > > > > Hi Till,
> > > > > >
> > > > > > I think it makes sense to release a bug fix version (especially
> > some
> > > > > > serious bug fixes) for flink 1.5.
> > > > > > Consider that some companies' production environments are more
> > > cautious
> > > > > > about upgrading large versions.
> > > > > > I think some organizations are still using 1.5.x or even 1.4.x.
> > > > > >
> > > > > > Best,
> > > > > > Vino
> > > > > >
> > > > > > Till Rohrmann  于2018年12月7日周五 下午11:39写道:
> > > > > >
> > > > > >> Dear community,
> > > > > >>
> > > > > >> I wanted to reach out to you and discuss whether we should
> > release a
> > > > last
> > > > > >> bug fix release for the 1.5 branch.
> > > > > >>
> > > > > >> Since we have already released Flink 1.7.0, we only need to
> > support
> > > > the
> > > > > >> 1.6.x and 1.7.x branches (last two major releases). However, the
> > > > current
> > > > > >> release-1.5 branch contains 45 unreleased fixes. Some of the
> fixes
> > > > address
> > > > > >> serializer duplication problems (FLINK-10839, FLINK-10693),
> fixing
> > > > > >> retractions (FLINK-10674) or prevent a deadlock in the
> > > > > >> SpillableSubpartition (FLINK-10491). I think it would be nice
> for
> > > our
> > > > users
> > > > > >> if we officially terminated the Flink 1.5.x support with a last
> > > 1.5.6
> > > > > >> release. What do you think?
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Till
> > > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>


[jira] [Created] (FLINK-11122) SafetyNetCloseableRegistryTest fails with ClassCastException

2018-12-10 Thread Gary Yao (JIRA)
Gary Yao created FLINK-11122:


 Summary: SafetyNetCloseableRegistryTest fails with 
ClassCastException
 Key: FLINK-11122
 URL: https://issues.apache.org/jira/browse/FLINK-11122
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Gary Yao
 Fix For: 1.8.0


When compiling and running {{SafetyNetCloseableRegistryTest}} with Java 9, some 
tests fail with a {{ClassCastException}}

{noformat}
java.lang.AssertionError: java.lang.ClassCastException: 
org.apache.flink.core.fs.local.LocalDataOutputStream cannot be cast to 
org.apache.flink.core.fs.WrappingProxyCloseable
at 
org.apache.flink.core.fs.SafetyNetCloseableRegistry$PhantomDelegatingCloseableRef.(SafetyNetCloseableRegistry.java:156)
at 
org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:99)
at 
org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:50)
at 
org.apache.flink.util.AbstractCloseableRegistry.registerCloseable(AbstractCloseableRegistry.java:79)
at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:101)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:127)
at 
org.apache.flink.core.fs.SafetyNetCloseableRegistryTest$3.go(SafetyNetCloseableRegistryTest.java:120)
at 
org.apache.flink.core.testutils.CheckedThread.run(CheckedThread.java:74)
{noformat}

This is due to the problematic signature in {{WrappingProxyUtil#stripProxy(T), 
which expects a generic type {{T}} and also returns {{T}}. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[ANNOUNCE] Weekly community update #50

2018-12-10 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #50. Please post any news and
updates you want to share with the community to this thread.

# Unified core API for streaming and batch

The community started to discuss how to bring streaming and batch closer
together by implementing a common Operator abstraction on which both stream
and batch operators can run [1]. The discussion is still in its early stage
but you should subscribe to this thread if you want to stay up to date.

# Flink backward compatibility

Thomas started a while ago a discussion about Flink's backwards
compatibility which should not only include its APIs because Flink is used
by more and more third party applications [2]. As Stephan and Chesnay
mentioned, backwards compatibility should also be guaranteed for the client
APIs and data structures (e.g. job specification).

# Enhance convenience of TableEnvironment in Table API/SQL

Jincheng started a discussion on how to improve the TableEnvironment usage
from a user's perspective. At the moment the existing inheritance structure
can be confusing to users. He, thus, proposes to change this structure to
have more meaningful names for the user [3].

# Creating Flink 1.5.6

The community discussed whether to release a last bug fix release 1.5.6 for
the 1.5.x release branch [4]. So far the unanimous feedback is positive and
in favour of creating a last 1.5.6 release.

# Usage of Flink's Python API

The community started a survey of the usage of Flink's Python APIs [5].
Please join this discussion if you want to tell how you are using Flink's
Python APIs and how it could be improved.

[1]
https://lists.apache.org/thread.html/2746759af3c92091bb743cfe028c90777f8011a064bb95e65b1fb951@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/99059c90a0a1b59a4f18a5a0fdb73e17071b17bbb036649a48bb233b@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/b740feb190fd63db3d15bfe0399097d905ea49fad83ce9ccf4c070cd@%3Cdev.flink.apache.org%3E
[5]
https://lists.apache.org/thread.html/348366080d6b87bf390efb98e5bf268620ab04a0451f8459e2f466cd@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: [DISCUSS] Creating last bug fix release for 1.5 branch

2018-12-10 Thread Till Rohrmann
Thanks for the feedback! I conclude that the community is in favour of a
last 1.5.6 release. I'll try to make the arrangements in the next two weeks.

Cheers,
Till

On Mon, Dec 10, 2018 at 2:40 AM jincheng sun 
wrote:

> +1. There are incompatible improvements between 1.5.x and 1.6/1.7, so many
> 1.5.x users may not be willing to upgrade to 1.6 or 1.7 due to migration
> costs, so it makes sense to creating last bug fix release for 1.5 branch.
>
> Bests,
> Jincheng
>
> Jeff Zhang  于2018年12月10日周一 上午9:24写道:
>
> > +1, I think very few people would use 1.6 or 1.7 in their production in
> > near future, so I expect they would use 1.5 in production for a long
> > period,it makes sense to provide a stable version for production usage.
> >
> > Ufuk Celebi  于2018年12月9日周日 下午6:07写道:
> >
> > > +1. This seems reasonable to me. Since the fixes are already in and
> > > also part of other releases, the release overhead should be
> > > manageable.
> > >
> > > @Vino: I agree with your assessment.
> > >
> > > @Qi: As Till mentioned, the official project guideline is to support
> > > the last two minor releases, e.g. currently 1.7 and 1.6.
> > >
> > > Best,
> > >
> > > Ufuk
> > >
> > > On Sun, Dec 9, 2018 at 3:48 AM qi luo  wrote:
> > > >
> > > > Hi Till,
> > > >
> > > > Does Flink has an agreement on how long will a major version be
> > > supported? Some companies may need a long time to upgrade Flink major
> > > versions in production. If Flink terminates support for a major version
> > too
> > > quickly, it may be a concern for companies.
> > > >
> > > > Best,
> > > > Qi
> > > >
> > > > > On Dec 8, 2018, at 10:57 AM, vino yang 
> > wrote:
> > > > >
> > > > > Hi Till,
> > > > >
> > > > > I think it makes sense to release a bug fix version (especially
> some
> > > > > serious bug fixes) for flink 1.5.
> > > > > Consider that some companies' production environments are more
> > cautious
> > > > > about upgrading large versions.
> > > > > I think some organizations are still using 1.5.x or even 1.4.x.
> > > > >
> > > > > Best,
> > > > > Vino
> > > > >
> > > > > Till Rohrmann  于2018年12月7日周五 下午11:39写道:
> > > > >
> > > > >> Dear community,
> > > > >>
> > > > >> I wanted to reach out to you and discuss whether we should
> release a
> > > last
> > > > >> bug fix release for the 1.5 branch.
> > > > >>
> > > > >> Since we have already released Flink 1.7.0, we only need to
> support
> > > the
> > > > >> 1.6.x and 1.7.x branches (last two major releases). However, the
> > > current
> > > > >> release-1.5 branch contains 45 unreleased fixes. Some of the fixes
> > > address
> > > > >> serializer duplication problems (FLINK-10839, FLINK-10693), fixing
> > > > >> retractions (FLINK-10674) or prevent a deadlock in the
> > > > >> SpillableSubpartition (FLINK-10491). I think it would be nice for
> > our
> > > users
> > > > >> if we officially terminated the Flink 1.5.x support with a last
> > 1.5.6
> > > > >> release. What do you think?
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


[jira] [Created] (FLINK-11121) Check and update licensing notes for Aliyun FS [FLINK-10865]

2018-12-10 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-11121:
--

 Summary: Check and update licensing notes for Aliyun FS 
[FLINK-10865]
 Key: FLINK-11121
 URL: https://issues.apache.org/jira/browse/FLINK-11121
 Project: Flink
  Issue Type: Task
  Components: FileSystem
Affects Versions: 1.8.0
Reporter: Stefan Richter
 Fix For: 1.8.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Interactive Programming in Flink Table API

2018-12-10 Thread Piotr Nowojski
Hi Becket,

Sorry for not responding long time.

Regarding case1.

There wouldn’t be no “a.unCache()” method, but I would expect only 
`cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect 
`cachedTableA2`. Just as in any other database dropping modifying one 
independent table/materialised view does not affect others.

> What I meant is that assuming there is already a cached table, ideally users 
> need
> not to specify whether the next query should read from the cache or use the
> original DAG. This should be decided by the optimizer.

1. If we want to let optimiser make decisions whether to use cache or not, then 
why do we need “void cache()” method at all? Would It  “increase” the chance of 
using the cache? That’s sounds strange. What would be the mechanism of deciding 
whether to use the cache or not? If we want to introduce such kind  automated 
optimisations of “plan nodes deduplication” I would turn it on globally, not 
per table, and let the optimiser do all of the work.
2. We do not have statistics at the moment for any use/not use cache decision.
3. Even if we had, I would be veeerryy sceptical whether such cost based 
optimisations would work properly and I would still insist first on providing 
explicit caching mechanism (`CachedTable cache()`)
4. As Till wrote, having explicit `CachedTable cache()` doesn’t contradict 
future work on automated cost based caching.


At the same time I’m not sure if you have responded to our objections of `void 
cache()` being implicit/having side effects, which me, Jark, Fabian, Till and I 
think also Shaoxuan are supporting.

Piotrek

> On 5 Dec 2018, at 12:42, Becket Qin  wrote:
> 
> Hi Till,
> 
> It is true that after the first job submission, there will be no ambiguity
> in terms of whether a cached table is used or not. That is the same for the
> cache() without returning a CachedTable.
> 
> Conceptually one could think of cache() as introducing a caching operator
>> from which you need to consume from if you want to benefit from the caching
>> functionality.
> 
> I am thinking a little differently. I think it is a hint (as you mentioned
> later) instead of a new operator. I'd like to be careful about the semantic
> of the API. A hint is a property set on an existing operator, but is not
> itself an operator as it does not really manipulate the data.
> 
> I agree, ideally the optimizer makes this kind of decision which
>> intermediate result should be cached. But especially when executing ad-hoc
>> queries the user might better know which results need to be cached because
>> Flink might not see the full DAG. In that sense, I would consider the
>> cache() method as a hint for the optimizer. Of course, in the future we
>> might add functionality which tries to automatically cache results (e.g.
>> caching the latest intermediate results until so and so much space is
>> used). But this should hopefully not contradict with `CachedTable cache()`.
> 
> I agree that cache() method is needed for exactly the reason you mentioned,
> i.e. Flink cannot predict what users are going to write later, so users
> need to tell Flink explicitly that this table will be used later. What I
> meant is that assuming there is already a cached table, ideally users need
> not to specify whether the next query should read from the cache or use the
> original DAG. This should be decided by the optimizer.
> 
> To explain the difference between returning / not returning a CachedTable,
> I want compare the following two case:
> 
> *Case 1:  returning a CachedTable*
> b = a.map(...)
> val cachedTableA1 = a.cache()
> val cachedTableA2 = a.cache()
> b.print() // Just to make sure a is cached.
> 
> c = a.filter(...) // User specify that the original DAG is used? Or the
> optimizer decides whether DAG or cache should be used?
> d = cachedTableA1.filter() // User specify that the cached table is used.
> 
> a.unCache() // Can cachedTableA still be used afterwards?
> cachedTableA1.uncache() // Can cachedTableA2 still be used?
> 
> *Case 2: not returning a CachedTable*
> b = a.map()
> a.cache()
> a.cache() // no-op
> b.print() // Just to make sure a is cached
> 
> c = a.filter(...) // Optimizer decides whether the cache or DAG should be
> used
> d = a.filter(...) // Optimizer decides whether the cache or DAG should be
> used
> 
> a.unCache()
> a.unCache() // no-op
> 
> In case 1, semantic wise, optimizer lose the option to choose between DAG
> and cache. And the unCache() call becomes tricky.
> In case 2, users do not need to worry about whether cache or DAG is used.
> And the unCache() semantic is clear. However, the caveat is that users
> cannot explicitly ignore the cache.
> 
> In order to address the issues mentioned in case 2 and inspired by the
> discussion so far, I am thinking about using hint to allow user explicitly
> ignore cache. Although we do not have hint yet, but we probably should have
> one. So the code becomes:
> 
> *Case 3: returning this table*
> b = a.map()

[jira] [Created] (FLINK-11120) The bug of timestampadd handles time

2018-12-10 Thread xuqianjin (JIRA)
xuqianjin created FLINK-11120:
-

 Summary: The bug of timestampadd  handles time
 Key: FLINK-11120
 URL: https://issues.apache.org/jira/browse/FLINK-11120
 Project: Flink
  Issue Type: Sub-task
Reporter: xuqianjin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL

2018-12-10 Thread jincheng sun
Hi All,

According to the feedback from users, the design of TableEnvironment is
very inconvenient for users, and often mistakenly imported by IDE,
especially for Java users, such as:

ExecutionEnvironment env = ...

BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);


The user does not know which BatchTableEnvironment should be imported,
because there are three implementations of BatchTableEnvironment, shown as
below:


1. org.apache.flink.table.api.BatchTableEnvironment 2.
> org.apache.flink.table.api.java.BatchTableEnvironment 3.
> org.apache.flink.table.api.scala.BatchTableEnvironment


[image: image.png]


This brings unnecessary inconveniences to the flink user. To solve this
problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself discussed
offline a bit and propose to change the inheritance diagram of
TableEnvironment is shown as follows:

1. AbstractTaleEnvironment - rename current TableEnvironment to
> AbstractTableEnvironment, The functionality implemented by Abstract
> TableEnvironment is stream and batch shared.

2. TableEnvironment - Create a new TableEnvironment(abstract), and defined
> all methods in (java/scala)StreamTableEnvironment and
> (java/scala)BatchTableEnvironment. In the implementation of
> BatchTableEnviroment and StreamTableEnviroment, the unsupported operations
> will be reported as an error.

[image: image.png]
Then the usage as follows:

ExecutionEnvironment env = …

TableEnvironment tEnv = TableEnvironment.getTableEnvironment(env)


For detailed proposals please refer to the Google doc:
https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing

Any mail feedback and Google doc comment are welcome.


Thanks,

Jincheng


[jira] [Created] (FLINK-11119) Incorrect Scala example for Table Function

2018-12-10 Thread Denys Fakhritdinov (JIRA)
Denys Fakhritdinov created FLINK-9:
--

 Summary: Incorrect Scala example for Table Function
 Key: FLINK-9
 URL: https://issues.apache.org/jira/browse/FLINK-9
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.7.0, 1.6.2
Reporter: Denys Fakhritdinov


Issue in Scala example in documentation: 
[https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#table-functions]

Currently it is:
{code:java}
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN 
TABLE(split(a)) as T(word, length) ON TRUE")
{code}
Should be (like in Java version):
{code:java}
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL 
TABLE(split(a)) as T(word, length) ON TRUE")
{code}
* LATERAL is missed in Scala version



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11118) Refactor and unify rowtime timestamp extractor interface

2018-12-10 Thread vinoyang (JIRA)
vinoyang created FLINK-8:


 Summary: Refactor and unify rowtime timestamp extractor interface
 Key: FLINK-8
 URL: https://issues.apache.org/jira/browse/FLINK-8
 Project: Flink
  Issue Type: Sub-task
Reporter: vinoyang
Assignee: vinoyang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11116) Clean-up temporary files that upon recovery, they belong to no checkpoint.

2018-12-10 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-6:
--

 Summary: Clean-up temporary files that upon recovery, they belong 
to no checkpoint.
 Key: FLINK-6
 URL: https://issues.apache.org/jira/browse/FLINK-6
 Project: Flink
  Issue Type: Improvement
  Components: filesystem-connector
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.1


In order to guarantee exactly-once semantics, the streaming file sink is 
implementing a two-phase commit protocol when writing files to the filesystem.

Initially data is written to in-progress files. These files are then put into 
"pending" state when they are completed (based on the rolling policy), and they 
are finally committed when the checkpoint that put them in the "pending" state 
is acknowledged as complete.

The above shows that in the case that we have:
1) checkpoints A, B, C coming 
2) checkpoint A being acknowledged and 
3) failure

Then we may have files that do not belong to any checkpoint (because B and C 
were not considered successful). These files are currently not cleaned up.

This issue aims at cleaning up these files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)