Re: [DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL
Hi Xuefu, Thanks for your feedback, and mention the compatibility issues. You are right the change will result version incompatibility. And we my plan it's will be released in the version of 1.8.x. To be frank, we have considered the compatibility approach, which is to retain the current TableEnvironment, and then create a new one, such as "GeneralTableEnvironment" for unified abstraction, and then Deprecated the TableEnvironment. But we feel that the code is not clean enough, and the long-term goal is that we need to make StreamTableEnvironment and BatchTableEnvironment transparent to the user, so I tend to release this change in 1.8.x, keeping the status quo in 1.7.x. What do you think? Any feedback is welcome! Thanks, Jincheng Zhang, Xuefu 于2018年12月11日周二 下午1:13写道: > Hi Jincheng, > > Thanks for bringing this up. It seems making good sense to me. However, > one concern I have is about backward compatibility. Could you clarify > whether existing user program will break with the proposed changes? > > The answer to the question would largely determine when this can be > introduced. > > Thanks, > Xuefu > > > -- > Sender:jincheng sun > Sent at:2018 Dec 10 (Mon) 18:14 > Recipient:dev > Subject:[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL > > Hi All, > > According to the feedback from users, the design of TableEnvironment is > very inconvenient for users, and often mistakenly imported by IDE, > especially for Java users, such as: > > ExecutionEnvironment env = ...BatchTableEnvironment tEnv = > TableEnvironment.getTableEnvironment(env); > > The user does not know which BatchTableEnvironment should be imported, > because there are three implementations of BatchTableEnvironment, shown as > below: > > 1. org.apache.flink.table.api.BatchTableEnvironment 2. > org.apache.flink.table.api.java.BatchTableEnvironment 3. > org.apache.flink.table.api.scala.BatchTableEnvironment > [image.png] > > > This brings unnecessary inconveniences to the flink user. To solve this > problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself > discussed offline a bit and propose to change the inheritance diagram of > TableEnvironment is shown as follows: > 1. AbstractTaleEnvironment - rename current TableEnvironment to > AbstractTableEnvironment, The functionality implemented by Abstract > TableEnvironment is stream and batch shared.2. TableEnvironment - Create a > new TableEnvironment(abstract), and defined all methods in > (java/scala)StreamTableEnvironment and (java/scala)BatchTableEnvironment. > In the implementation of BatchTableEnviroment and StreamTableEnviroment, > the unsupported operations will be reported as an error. > [image.png] > Then the usage as follows: > > ExecutionEnvironment env = …TableEnvironment tEnv = > TableEnvironment.getTableEnvironment(env) > For detailed proposals please refer to the Google doc: > https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing > > Any mail feedback and Google doc comment are welcome. > > Thanks, > Jincheng > >
Re: [DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL
Hi Jincheng, thanks for the proposal. I totally agree with the problem of having 3 StreamTableEnvironments and 3 BatchTableEnvironments. We also identified this problem when doing Flink trainings and introductions to the Table & SQL API. Actually, @Dawid and I were already discussing to remove this shortcoming while working on FLINK-11067 [1]. The porting allows to fix the class hierarchy because some visibilities of members change as well from Scala to Java. This would not break backwards compatibility as the base classes should not be used by users anyway. However, I don't like the design of putting all methods of Batch and Stream environments into the base class and throw exceptions if not supported by base classes. This sounds not like a nice object oriented design and confuses users. I added some comments to the document. I think we can improve the current situation without breaking backwards compatibility. Methods that interact with Scala and Java API such as toDataSet/toDataStream should not be moved to an abstract class as they would otherwise pull in Scala dependencies transitively or do not incoperate with the type extraction logic of the target API. Regards, Timo [1] https://issues.apache.org/jira/browse/FLINK-11067 Am 11.12.18 um 06:12 schrieb Zhang, Xuefu: Hi Jincheng, Thanks for bringing this up. It seems making good sense to me. However, one concern I have is about backward compatibility. Could you clarify whether existing user program will break with the proposed changes? The answer to the question would largely determine when this can be introduced. Thanks, Xuefu -- Sender:jincheng sun Sent at:2018 Dec 10 (Mon) 18:14 Recipient:dev Subject:[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL Hi All, According to the feedback from users, the design of TableEnvironment is very inconvenient for users, and often mistakenly imported by IDE, especially for Java users, such as: ExecutionEnvironment env = ...BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); The user does not know which BatchTableEnvironment should be imported, because there are three implementations of BatchTableEnvironment, shown as below: 1. org.apache.flink.table.api.BatchTableEnvironment 2. org.apache.flink.table.api.java.BatchTableEnvironment 3. org.apache.flink.table.api.scala.BatchTableEnvironment [image.png] This brings unnecessary inconveniences to the flink user. To solve this problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself discussed offline a bit and propose to change the inheritance diagram of TableEnvironment is shown as follows: 1. AbstractTaleEnvironment - rename current TableEnvironment to AbstractTableEnvironment, The functionality implemented by Abstract TableEnvironment is stream and batch shared.2. TableEnvironment - Create a new TableEnvironment(abstract), and defined all methods in (java/scala)StreamTableEnvironment and (java/scala)BatchTableEnvironment. In the implementation of BatchTableEnviroment and StreamTableEnviroment, the unsupported operations will be reported as an error. [image.png] Then the usage as follows: ExecutionEnvironment env = …TableEnvironment tEnv = TableEnvironment.getTableEnvironment(env) For detailed proposals please refer to the Google doc: https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing Any mail feedback and Google doc comment are welcome. Thanks, Jincheng
[jira] [Created] (FLINK-11126) Filter out AMRMToken in the TaskManager credentials
Paul Lin created FLINK-11126: Summary: Filter out AMRMToken in the TaskManager credentials Key: FLINK-11126 URL: https://issues.apache.org/jira/browse/FLINK-11126 Project: Flink Issue Type: Improvement Components: Security, YARN Affects Versions: 1.7.0, 1.6.2 Reporter: Paul Lin Assignee: Paul Lin Currently, Flink JobManager propagates its storage tokens to TaskManager to meet the requirement of YARN log aggregation (see FLINK-6376). But in this way the AMRMToken is also included in the TaskManager credentials, which could be potentially insecure. We should filter out AMRMToken before setting the tokens to TaskManager's container launch context. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL
Hi Jincheng, Thanks for bringing this up. It seems making good sense to me. However, one concern I have is about backward compatibility. Could you clarify whether existing user program will break with the proposed changes? The answer to the question would largely determine when this can be introduced. Thanks, Xuefu -- Sender:jincheng sun Sent at:2018 Dec 10 (Mon) 18:14 Recipient:dev Subject:[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL Hi All, According to the feedback from users, the design of TableEnvironment is very inconvenient for users, and often mistakenly imported by IDE, especially for Java users, such as: ExecutionEnvironment env = ...BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); The user does not know which BatchTableEnvironment should be imported, because there are three implementations of BatchTableEnvironment, shown as below: 1. org.apache.flink.table.api.BatchTableEnvironment 2. org.apache.flink.table.api.java.BatchTableEnvironment 3. org.apache.flink.table.api.scala.BatchTableEnvironment [image.png] This brings unnecessary inconveniences to the flink user. To solve this problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself discussed offline a bit and propose to change the inheritance diagram of TableEnvironment is shown as follows: 1. AbstractTaleEnvironment - rename current TableEnvironment to AbstractTableEnvironment, The functionality implemented by Abstract TableEnvironment is stream and batch shared.2. TableEnvironment - Create a new TableEnvironment(abstract), and defined all methods in (java/scala)StreamTableEnvironment and (java/scala)BatchTableEnvironment. In the implementation of BatchTableEnviroment and StreamTableEnviroment, the unsupported operations will be reported as an error. [image.png] Then the usage as follows: ExecutionEnvironment env = …TableEnvironment tEnv = TableEnvironment.getTableEnvironment(env) For detailed proposals please refer to the Google doc: https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing Any mail feedback and Google doc comment are welcome. Thanks, Jincheng
Re: [DISCUSS] Support Interactive Programming in Flink Table API
Another potential concern for semantic 3 is that. In the future, we may add automatic caching to Flink. e.g. cache the intermediate results at the shuffle boundary. If our semantic is that reference to the original table means skipping cache, those users may not be able to benefit from the implicit cache. On Tue, Dec 11, 2018 at 12:10 PM Becket Qin wrote: > Hi Piotrek, > > Thanks for the reply. Thought about it again, I might have misunderstood > your proposal in earlier emails. Returning a CachedTable might not be a bad > idea. > > I was more concerned about the semantic and its intuitiveness when a > CachedTable is returned. i..e, if cache() returns CachedTable. What are the > semantic in the following code: > { > val cachedTable = a.cache() > val b = cachedTable.select(...) > val c = a.select(...) > } > What is the difference between b and c? At the first glance, I see two > options: > > Semantic 1. b uses cachedTable as user demanded so. c uses original DAG as > user demanded so. In this case, the optimizer has no chance to optimize. > Semantic 2. b uses cachedTable as user demanded so. c leaves the optimizer > to choose whether the cache or DAG should be used. In this case, user lose > the option to NOT use cache. > > As you can see, neither of the options seem perfect. However, I guess you > and Till are proposing the third option: > > Semantic 3. b leaves the optimizer to choose whether cache or DAG should > be used. c always use the DAG. > > This does address all the concerns. It is just that from intuitiveness > perspective, I found that asking user to explicitly use a CachedTable while > the optimizer might choose to ignore is a little weird. That was why I did > not think about that semantic. But given there is material benefit, I think > this semantic is acceptable. > > 1. If we want to let optimiser make decisions whether to use cache or not, >> then why do we need “void cache()” method at all? Would It “increase” the >> chance of using the cache? That’s sounds strange. What would be the >> mechanism of deciding whether to use the cache or not? If we want to >> introduce such kind automated optimisations of “plan nodes deduplication” >> I would turn it on globally, not per table, and let the optimiser do all of >> the work. >> 2. We do not have statistics at the moment for any use/not use cache >> decision. >> 3. Even if we had, I would be veeerryy sceptical whether such cost based >> optimisations would work properly and I would still insist first on >> providing explicit caching mechanism (`CachedTable cache()`) >> > We are absolutely on the same page here. An explicit cache() method is > necessary not only because optimizer may not be able to make the right > decision, but also because of the nature of interactive programming. For > example, if users write the following code in Scala shell: > val b = a.select(...) > val c = b.select(...) > val d = c.select(...).writeToSink(...) > tEnv.execute() > There is no way optimizer will know whether b or c will be used in later > code, unless users hint explicitly. > > At the same time I’m not sure if you have responded to our objections of >> `void cache()` being implicit/having side effects, which me, Jark, Fabian, >> Till and I think also Shaoxuan are supporting. > > Is there any other side effects if we use semantic 3 mentioned above? > > Thanks, > > JIangjie (Becket) Qin > > > On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski > wrote: > >> Hi Becket, >> >> Sorry for not responding long time. >> >> Regarding case1. >> >> There wouldn’t be no “a.unCache()” method, but I would expect only >> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect >> `cachedTableA2`. Just as in any other database dropping modifying one >> independent table/materialised view does not affect others. >> >> > What I meant is that assuming there is already a cached table, ideally >> users need >> > not to specify whether the next query should read from the cache or use >> the >> > original DAG. This should be decided by the optimizer. >> >> 1. If we want to let optimiser make decisions whether to use cache or >> not, then why do we need “void cache()” method at all? Would It “increase” >> the chance of using the cache? That’s sounds strange. What would be the >> mechanism of deciding whether to use the cache or not? If we want to >> introduce such kind automated optimisations of “plan nodes deduplication” >> I would turn it on globally, not per table, and let the optimiser do all of >> the work. >> 2. We do not have statistics at the moment for any use/not use cache >> decision. >> 3. Even if we had, I would be veeerryy sceptical whether such cost based >> optimisations would work properly and I would still insist first on >> providing explicit caching mechanism (`CachedTable cache()`) >> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t >> contradict future work on automated cost based caching. >> >> >> At the same time
[jira] [Created] (FLINK-11125) Remove useless import
Hequn Cheng created FLINK-11125: --- Summary: Remove useless import Key: FLINK-11125 URL: https://issues.apache.org/jira/browse/FLINK-11125 Project: Flink Issue Type: Improvement Components: Table API SQL, Tests Reporter: Hequn Cheng Assignee: Hequn Cheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] Support Interactive Programming in Flink Table API
Hi Piotrek, Thanks for the reply. Thought about it again, I might have misunderstood your proposal in earlier emails. Returning a CachedTable might not be a bad idea. I was more concerned about the semantic and its intuitiveness when a CachedTable is returned. i..e, if cache() returns CachedTable. What are the semantic in the following code: { val cachedTable = a.cache() val b = cachedTable.select(...) val c = a.select(...) } What is the difference between b and c? At the first glance, I see two options: Semantic 1. b uses cachedTable as user demanded so. c uses original DAG as user demanded so. In this case, the optimizer has no chance to optimize. Semantic 2. b uses cachedTable as user demanded so. c leaves the optimizer to choose whether the cache or DAG should be used. In this case, user lose the option to NOT use cache. As you can see, neither of the options seem perfect. However, I guess you and Till are proposing the third option: Semantic 3. b leaves the optimizer to choose whether cache or DAG should be used. c always use the DAG. This does address all the concerns. It is just that from intuitiveness perspective, I found that asking user to explicitly use a CachedTable while the optimizer might choose to ignore is a little weird. That was why I did not think about that semantic. But given there is material benefit, I think this semantic is acceptable. 1. If we want to let optimiser make decisions whether to use cache or not, > then why do we need “void cache()” method at all? Would It “increase” the > chance of using the cache? That’s sounds strange. What would be the > mechanism of deciding whether to use the cache or not? If we want to > introduce such kind automated optimisations of “plan nodes deduplication” > I would turn it on globally, not per table, and let the optimiser do all of > the work. > 2. We do not have statistics at the moment for any use/not use cache > decision. > 3. Even if we had, I would be veeerryy sceptical whether such cost based > optimisations would work properly and I would still insist first on > providing explicit caching mechanism (`CachedTable cache()`) > We are absolutely on the same page here. An explicit cache() method is necessary not only because optimizer may not be able to make the right decision, but also because of the nature of interactive programming. For example, if users write the following code in Scala shell: val b = a.select(...) val c = b.select(...) val d = c.select(...).writeToSink(...) tEnv.execute() There is no way optimizer will know whether b or c will be used in later code, unless users hint explicitly. At the same time I’m not sure if you have responded to our objections of > `void cache()` being implicit/having side effects, which me, Jark, Fabian, > Till and I think also Shaoxuan are supporting. Is there any other side effects if we use semantic 3 mentioned above? Thanks, JIangjie (Becket) Qin On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski wrote: > Hi Becket, > > Sorry for not responding long time. > > Regarding case1. > > There wouldn’t be no “a.unCache()” method, but I would expect only > `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect > `cachedTableA2`. Just as in any other database dropping modifying one > independent table/materialised view does not affect others. > > > What I meant is that assuming there is already a cached table, ideally > users need > > not to specify whether the next query should read from the cache or use > the > > original DAG. This should be decided by the optimizer. > > 1. If we want to let optimiser make decisions whether to use cache or not, > then why do we need “void cache()” method at all? Would It “increase” the > chance of using the cache? That’s sounds strange. What would be the > mechanism of deciding whether to use the cache or not? If we want to > introduce such kind automated optimisations of “plan nodes deduplication” > I would turn it on globally, not per table, and let the optimiser do all of > the work. > 2. We do not have statistics at the moment for any use/not use cache > decision. > 3. Even if we had, I would be veeerryy sceptical whether such cost based > optimisations would work properly and I would still insist first on > providing explicit caching mechanism (`CachedTable cache()`) > 4. As Till wrote, having explicit `CachedTable cache()` doesn’t contradict > future work on automated cost based caching. > > > At the same time I’m not sure if you have responded to our objections of > `void cache()` being implicit/having side effects, which me, Jark, Fabian, > Till and I think also Shaoxuan are supporting. > > Piotrek > > > On 5 Dec 2018, at 12:42, Becket Qin wrote: > > > > Hi Till, > > > > It is true that after the first job submission, there will be no > ambiguity > > in terms of whether a cached table is used or not. That is the same for > the > > cache() without returning a CachedTable. > > > > Conceptually one could think of
[jira] [Created] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()
Hequn Cheng created FLINK-11124: --- Summary: Add private[flink] to TemporalTableFunction.create() Key: FLINK-11124 URL: https://issues.apache.org/jira/browse/FLINK-11124 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng {{TemporalTableFunction}} is an user-oriented class. I think it would be better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} method in order to make it invisible to users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11123) Improve ml quick start doc
sunjincheng created FLINK-11123: --- Summary: Improve ml quick start doc Key: FLINK-11123 URL: https://issues.apache.org/jira/browse/FLINK-11123 Project: Flink Issue Type: Improvement Components: Documentation, Machine Learning Library Affects Versions: 1.7.0 Reporter: sunjincheng Assignee: sunjincheng Fix For: 1.7.1, 1.7.0 The user cannot run the sample through the ml quick launch document because the import description of the class is missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] Creating last bug fix release for 1.5 branch
Thanks Till and my belated +1 for a final patch release :) On Mon, Dec 10, 2018 at 5:47 AM Till Rohrmann wrote: > Thanks for the feedback! I conclude that the community is in favour of a > last 1.5.6 release. I'll try to make the arrangements in the next two > weeks. > > Cheers, > Till > > On Mon, Dec 10, 2018 at 2:40 AM jincheng sun > wrote: > > > +1. There are incompatible improvements between 1.5.x and 1.6/1.7, so > many > > 1.5.x users may not be willing to upgrade to 1.6 or 1.7 due to migration > > costs, so it makes sense to creating last bug fix release for 1.5 branch. > > > > Bests, > > Jincheng > > > > Jeff Zhang 于2018年12月10日周一 上午9:24写道: > > > > > +1, I think very few people would use 1.6 or 1.7 in their production in > > > near future, so I expect they would use 1.5 in production for a long > > > period,it makes sense to provide a stable version for production usage. > > > > > > Ufuk Celebi 于2018年12月9日周日 下午6:07写道: > > > > > > > +1. This seems reasonable to me. Since the fixes are already in and > > > > also part of other releases, the release overhead should be > > > > manageable. > > > > > > > > @Vino: I agree with your assessment. > > > > > > > > @Qi: As Till mentioned, the official project guideline is to support > > > > the last two minor releases, e.g. currently 1.7 and 1.6. > > > > > > > > Best, > > > > > > > > Ufuk > > > > > > > > On Sun, Dec 9, 2018 at 3:48 AM qi luo wrote: > > > > > > > > > > Hi Till, > > > > > > > > > > Does Flink has an agreement on how long will a major version be > > > > supported? Some companies may need a long time to upgrade Flink major > > > > versions in production. If Flink terminates support for a major > version > > > too > > > > quickly, it may be a concern for companies. > > > > > > > > > > Best, > > > > > Qi > > > > > > > > > > > On Dec 8, 2018, at 10:57 AM, vino yang > > > wrote: > > > > > > > > > > > > Hi Till, > > > > > > > > > > > > I think it makes sense to release a bug fix version (especially > > some > > > > > > serious bug fixes) for flink 1.5. > > > > > > Consider that some companies' production environments are more > > > cautious > > > > > > about upgrading large versions. > > > > > > I think some organizations are still using 1.5.x or even 1.4.x. > > > > > > > > > > > > Best, > > > > > > Vino > > > > > > > > > > > > Till Rohrmann 于2018年12月7日周五 下午11:39写道: > > > > > > > > > > > >> Dear community, > > > > > >> > > > > > >> I wanted to reach out to you and discuss whether we should > > release a > > > > last > > > > > >> bug fix release for the 1.5 branch. > > > > > >> > > > > > >> Since we have already released Flink 1.7.0, we only need to > > support > > > > the > > > > > >> 1.6.x and 1.7.x branches (last two major releases). However, the > > > > current > > > > > >> release-1.5 branch contains 45 unreleased fixes. Some of the > fixes > > > > address > > > > > >> serializer duplication problems (FLINK-10839, FLINK-10693), > fixing > > > > > >> retractions (FLINK-10674) or prevent a deadlock in the > > > > > >> SpillableSubpartition (FLINK-10491). I think it would be nice > for > > > our > > > > users > > > > > >> if we officially terminated the Flink 1.5.x support with a last > > > 1.5.6 > > > > > >> release. What do you think? > > > > > >> > > > > > >> Cheers, > > > > > >> Till > > > > > >> > > > > > > > > > > > > > > > > > > -- > > > Best Regards > > > > > > Jeff Zhang > > > > > >
[jira] [Created] (FLINK-11122) SafetyNetCloseableRegistryTest fails with ClassCastException
Gary Yao created FLINK-11122: Summary: SafetyNetCloseableRegistryTest fails with ClassCastException Key: FLINK-11122 URL: https://issues.apache.org/jira/browse/FLINK-11122 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Gary Yao Fix For: 1.8.0 When compiling and running {{SafetyNetCloseableRegistryTest}} with Java 9, some tests fail with a {{ClassCastException}} {noformat} java.lang.AssertionError: java.lang.ClassCastException: org.apache.flink.core.fs.local.LocalDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable at org.apache.flink.core.fs.SafetyNetCloseableRegistry$PhantomDelegatingCloseableRef.(SafetyNetCloseableRegistry.java:156) at org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:99) at org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:50) at org.apache.flink.util.AbstractCloseableRegistry.registerCloseable(AbstractCloseableRegistry.java:79) at org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:101) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:127) at org.apache.flink.core.fs.SafetyNetCloseableRegistryTest$3.go(SafetyNetCloseableRegistryTest.java:120) at org.apache.flink.core.testutils.CheckedThread.run(CheckedThread.java:74) {noformat} This is due to the problematic signature in {{WrappingProxyUtil#stripProxy(T), which expects a generic type {{T}} and also returns {{T}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[ANNOUNCE] Weekly community update #50
Dear community, this is the weekly community update thread #50. Please post any news and updates you want to share with the community to this thread. # Unified core API for streaming and batch The community started to discuss how to bring streaming and batch closer together by implementing a common Operator abstraction on which both stream and batch operators can run [1]. The discussion is still in its early stage but you should subscribe to this thread if you want to stay up to date. # Flink backward compatibility Thomas started a while ago a discussion about Flink's backwards compatibility which should not only include its APIs because Flink is used by more and more third party applications [2]. As Stephan and Chesnay mentioned, backwards compatibility should also be guaranteed for the client APIs and data structures (e.g. job specification). # Enhance convenience of TableEnvironment in Table API/SQL Jincheng started a discussion on how to improve the TableEnvironment usage from a user's perspective. At the moment the existing inheritance structure can be confusing to users. He, thus, proposes to change this structure to have more meaningful names for the user [3]. # Creating Flink 1.5.6 The community discussed whether to release a last bug fix release 1.5.6 for the 1.5.x release branch [4]. So far the unanimous feedback is positive and in favour of creating a last 1.5.6 release. # Usage of Flink's Python API The community started a survey of the usage of Flink's Python APIs [5]. Please join this discussion if you want to tell how you are using Flink's Python APIs and how it could be improved. [1] https://lists.apache.org/thread.html/2746759af3c92091bb743cfe028c90777f8011a064bb95e65b1fb951@%3Cdev.flink.apache.org%3E [2] https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E [3] https://lists.apache.org/thread.html/99059c90a0a1b59a4f18a5a0fdb73e17071b17bbb036649a48bb233b@%3Cdev.flink.apache.org%3E [4] https://lists.apache.org/thread.html/b740feb190fd63db3d15bfe0399097d905ea49fad83ce9ccf4c070cd@%3Cdev.flink.apache.org%3E [5] https://lists.apache.org/thread.html/348366080d6b87bf390efb98e5bf268620ab04a0451f8459e2f466cd@%3Cdev.flink.apache.org%3E Cheers, Till
Re: [DISCUSS] Creating last bug fix release for 1.5 branch
Thanks for the feedback! I conclude that the community is in favour of a last 1.5.6 release. I'll try to make the arrangements in the next two weeks. Cheers, Till On Mon, Dec 10, 2018 at 2:40 AM jincheng sun wrote: > +1. There are incompatible improvements between 1.5.x and 1.6/1.7, so many > 1.5.x users may not be willing to upgrade to 1.6 or 1.7 due to migration > costs, so it makes sense to creating last bug fix release for 1.5 branch. > > Bests, > Jincheng > > Jeff Zhang 于2018年12月10日周一 上午9:24写道: > > > +1, I think very few people would use 1.6 or 1.7 in their production in > > near future, so I expect they would use 1.5 in production for a long > > period,it makes sense to provide a stable version for production usage. > > > > Ufuk Celebi 于2018年12月9日周日 下午6:07写道: > > > > > +1. This seems reasonable to me. Since the fixes are already in and > > > also part of other releases, the release overhead should be > > > manageable. > > > > > > @Vino: I agree with your assessment. > > > > > > @Qi: As Till mentioned, the official project guideline is to support > > > the last two minor releases, e.g. currently 1.7 and 1.6. > > > > > > Best, > > > > > > Ufuk > > > > > > On Sun, Dec 9, 2018 at 3:48 AM qi luo wrote: > > > > > > > > Hi Till, > > > > > > > > Does Flink has an agreement on how long will a major version be > > > supported? Some companies may need a long time to upgrade Flink major > > > versions in production. If Flink terminates support for a major version > > too > > > quickly, it may be a concern for companies. > > > > > > > > Best, > > > > Qi > > > > > > > > > On Dec 8, 2018, at 10:57 AM, vino yang > > wrote: > > > > > > > > > > Hi Till, > > > > > > > > > > I think it makes sense to release a bug fix version (especially > some > > > > > serious bug fixes) for flink 1.5. > > > > > Consider that some companies' production environments are more > > cautious > > > > > about upgrading large versions. > > > > > I think some organizations are still using 1.5.x or even 1.4.x. > > > > > > > > > > Best, > > > > > Vino > > > > > > > > > > Till Rohrmann 于2018年12月7日周五 下午11:39写道: > > > > > > > > > >> Dear community, > > > > >> > > > > >> I wanted to reach out to you and discuss whether we should > release a > > > last > > > > >> bug fix release for the 1.5 branch. > > > > >> > > > > >> Since we have already released Flink 1.7.0, we only need to > support > > > the > > > > >> 1.6.x and 1.7.x branches (last two major releases). However, the > > > current > > > > >> release-1.5 branch contains 45 unreleased fixes. Some of the fixes > > > address > > > > >> serializer duplication problems (FLINK-10839, FLINK-10693), fixing > > > > >> retractions (FLINK-10674) or prevent a deadlock in the > > > > >> SpillableSubpartition (FLINK-10491). I think it would be nice for > > our > > > users > > > > >> if we officially terminated the Flink 1.5.x support with a last > > 1.5.6 > > > > >> release. What do you think? > > > > >> > > > > >> Cheers, > > > > >> Till > > > > >> > > > > > > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > >
[jira] [Created] (FLINK-11121) Check and update licensing notes for Aliyun FS [FLINK-10865]
Stefan Richter created FLINK-11121: -- Summary: Check and update licensing notes for Aliyun FS [FLINK-10865] Key: FLINK-11121 URL: https://issues.apache.org/jira/browse/FLINK-11121 Project: Flink Issue Type: Task Components: FileSystem Affects Versions: 1.8.0 Reporter: Stefan Richter Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] Support Interactive Programming in Flink Table API
Hi Becket, Sorry for not responding long time. Regarding case1. There wouldn’t be no “a.unCache()” method, but I would expect only `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect `cachedTableA2`. Just as in any other database dropping modifying one independent table/materialised view does not affect others. > What I meant is that assuming there is already a cached table, ideally users > need > not to specify whether the next query should read from the cache or use the > original DAG. This should be decided by the optimizer. 1. If we want to let optimiser make decisions whether to use cache or not, then why do we need “void cache()” method at all? Would It “increase” the chance of using the cache? That’s sounds strange. What would be the mechanism of deciding whether to use the cache or not? If we want to introduce such kind automated optimisations of “plan nodes deduplication” I would turn it on globally, not per table, and let the optimiser do all of the work. 2. We do not have statistics at the moment for any use/not use cache decision. 3. Even if we had, I would be veeerryy sceptical whether such cost based optimisations would work properly and I would still insist first on providing explicit caching mechanism (`CachedTable cache()`) 4. As Till wrote, having explicit `CachedTable cache()` doesn’t contradict future work on automated cost based caching. At the same time I’m not sure if you have responded to our objections of `void cache()` being implicit/having side effects, which me, Jark, Fabian, Till and I think also Shaoxuan are supporting. Piotrek > On 5 Dec 2018, at 12:42, Becket Qin wrote: > > Hi Till, > > It is true that after the first job submission, there will be no ambiguity > in terms of whether a cached table is used or not. That is the same for the > cache() without returning a CachedTable. > > Conceptually one could think of cache() as introducing a caching operator >> from which you need to consume from if you want to benefit from the caching >> functionality. > > I am thinking a little differently. I think it is a hint (as you mentioned > later) instead of a new operator. I'd like to be careful about the semantic > of the API. A hint is a property set on an existing operator, but is not > itself an operator as it does not really manipulate the data. > > I agree, ideally the optimizer makes this kind of decision which >> intermediate result should be cached. But especially when executing ad-hoc >> queries the user might better know which results need to be cached because >> Flink might not see the full DAG. In that sense, I would consider the >> cache() method as a hint for the optimizer. Of course, in the future we >> might add functionality which tries to automatically cache results (e.g. >> caching the latest intermediate results until so and so much space is >> used). But this should hopefully not contradict with `CachedTable cache()`. > > I agree that cache() method is needed for exactly the reason you mentioned, > i.e. Flink cannot predict what users are going to write later, so users > need to tell Flink explicitly that this table will be used later. What I > meant is that assuming there is already a cached table, ideally users need > not to specify whether the next query should read from the cache or use the > original DAG. This should be decided by the optimizer. > > To explain the difference between returning / not returning a CachedTable, > I want compare the following two case: > > *Case 1: returning a CachedTable* > b = a.map(...) > val cachedTableA1 = a.cache() > val cachedTableA2 = a.cache() > b.print() // Just to make sure a is cached. > > c = a.filter(...) // User specify that the original DAG is used? Or the > optimizer decides whether DAG or cache should be used? > d = cachedTableA1.filter() // User specify that the cached table is used. > > a.unCache() // Can cachedTableA still be used afterwards? > cachedTableA1.uncache() // Can cachedTableA2 still be used? > > *Case 2: not returning a CachedTable* > b = a.map() > a.cache() > a.cache() // no-op > b.print() // Just to make sure a is cached > > c = a.filter(...) // Optimizer decides whether the cache or DAG should be > used > d = a.filter(...) // Optimizer decides whether the cache or DAG should be > used > > a.unCache() > a.unCache() // no-op > > In case 1, semantic wise, optimizer lose the option to choose between DAG > and cache. And the unCache() call becomes tricky. > In case 2, users do not need to worry about whether cache or DAG is used. > And the unCache() semantic is clear. However, the caveat is that users > cannot explicitly ignore the cache. > > In order to address the issues mentioned in case 2 and inspired by the > discussion so far, I am thinking about using hint to allow user explicitly > ignore cache. Although we do not have hint yet, but we probably should have > one. So the code becomes: > > *Case 3: returning this table* > b = a.map()
[jira] [Created] (FLINK-11120) The bug of timestampadd handles time
xuqianjin created FLINK-11120: - Summary: The bug of timestampadd handles time Key: FLINK-11120 URL: https://issues.apache.org/jira/browse/FLINK-11120 Project: Flink Issue Type: Sub-task Reporter: xuqianjin -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[DISCUSS] Enhance convenience of TableEnvironment in TableAPI/SQL
Hi All, According to the feedback from users, the design of TableEnvironment is very inconvenient for users, and often mistakenly imported by IDE, especially for Java users, such as: ExecutionEnvironment env = ... BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); The user does not know which BatchTableEnvironment should be imported, because there are three implementations of BatchTableEnvironment, shown as below: 1. org.apache.flink.table.api.BatchTableEnvironment 2. > org.apache.flink.table.api.java.BatchTableEnvironment 3. > org.apache.flink.table.api.scala.BatchTableEnvironment [image: image.png] This brings unnecessary inconveniences to the flink user. To solve this problem, Wei Zhong, Hequn Cheng, Dian Fu, Shaoxuan Wang and myself discussed offline a bit and propose to change the inheritance diagram of TableEnvironment is shown as follows: 1. AbstractTaleEnvironment - rename current TableEnvironment to > AbstractTableEnvironment, The functionality implemented by Abstract > TableEnvironment is stream and batch shared. 2. TableEnvironment - Create a new TableEnvironment(abstract), and defined > all methods in (java/scala)StreamTableEnvironment and > (java/scala)BatchTableEnvironment. In the implementation of > BatchTableEnviroment and StreamTableEnviroment, the unsupported operations > will be reported as an error. [image: image.png] Then the usage as follows: ExecutionEnvironment env = … TableEnvironment tEnv = TableEnvironment.getTableEnvironment(env) For detailed proposals please refer to the Google doc: https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit?usp=sharing Any mail feedback and Google doc comment are welcome. Thanks, Jincheng
[jira] [Created] (FLINK-11119) Incorrect Scala example for Table Function
Denys Fakhritdinov created FLINK-9: -- Summary: Incorrect Scala example for Table Function Key: FLINK-9 URL: https://issues.apache.org/jira/browse/FLINK-9 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.7.0, 1.6.2 Reporter: Denys Fakhritdinov Issue in Scala example in documentation: [https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#table-functions] Currently it is: {code:java} tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE") {code} Should be (like in Java version): {code:java} tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE") {code} * LATERAL is missed in Scala version -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11118) Refactor and unify rowtime timestamp extractor interface
vinoyang created FLINK-8: Summary: Refactor and unify rowtime timestamp extractor interface Key: FLINK-8 URL: https://issues.apache.org/jira/browse/FLINK-8 Project: Flink Issue Type: Sub-task Reporter: vinoyang Assignee: vinoyang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11116) Clean-up temporary files that upon recovery, they belong to no checkpoint.
Kostas Kloudas created FLINK-6: -- Summary: Clean-up temporary files that upon recovery, they belong to no checkpoint. Key: FLINK-6 URL: https://issues.apache.org/jira/browse/FLINK-6 Project: Flink Issue Type: Improvement Components: filesystem-connector Affects Versions: 1.7.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.7.1 In order to guarantee exactly-once semantics, the streaming file sink is implementing a two-phase commit protocol when writing files to the filesystem. Initially data is written to in-progress files. These files are then put into "pending" state when they are completed (based on the rolling policy), and they are finally committed when the checkpoint that put them in the "pending" state is acknowledged as complete. The above shows that in the case that we have: 1) checkpoints A, B, C coming 2) checkpoint A being acknowledged and 3) failure Then we may have files that do not belong to any checkpoint (because B and C were not considered successful). These files are currently not cleaned up. This issue aims at cleaning up these files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)