[jira] [Commented] (FLINK-6388) Add support for DISTINCT into Code Generated Aggregations
[ https://issues.apache.org/jira/browse/FLINK-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988313#comment-15988313 ] ASF GitHub Bot commented on FLINK-6388: --- Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @rtudoran @fhueske the first implementation I made was with the state in the ProcessFunction without code generated aggregation function. Second, I pushed a branch with the state in the process function using the code generated process function. Then, third I moved the state within the code generated function. It is not clear to me why the state cannot be within the code generated function. Could you please clarify so that we can understand whether it is worth working around it. This feature is quite important for us. Anyway, you could have a look at the branch that uses the state in the process function and uses the code generated aggregation functions. Basically, rather than generate one code generated function for all the aggregations, I create one class for each, and then I call the corresponding accumulate/retract using the distinct logic when marked in the process function. > Add support for DISTINCT into Code Generated Aggregations > - > > Key: FLINK-6388 > URL: https://issues.apache.org/jira/browse/FLINK-6388 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli > Fix For: 1.3.0 > > > We should support DISTINCT in Code Generated aggrgation functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @rtudoran @fhueske the first implementation I made was with the state in the ProcessFunction without code generated aggregation function. Second, I pushed a branch with the state in the process function using the code generated process function. Then, third I moved the state within the code generated function. It is not clear to me why the state cannot be within the code generated function. Could you please clarify so that we can understand whether it is worth working around it. This feature is quite important for us. Anyway, you could have a look at the branch that uses the state in the process function and uses the code generated aggregation functions. Basically, rather than generate one code generated function for all the aggregations, I create one class for each, and then I call the corresponding accumulate/retract using the distinct logic when marked in the process function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6406) Cleanup useless import
[ https://issues.apache.org/jira/browse/FLINK-6406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6406: --- Description: When browsing the code, it is found that there are some useless reference in the following file which need cleanup. {{packages.scala}} *ExternalCatalogTable *arithmetic.scala *array.scala *ColumnStats was: When browsing the code, it is found that there are some useless reference in the following file which need cleanup. *packages.scala *ExternalCatalogTable *arithmetic.scala *array.scala *ColumnStats > Cleanup useless import > --- > > Key: FLINK-6406 > URL: https://issues.apache.org/jira/browse/FLINK-6406 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When browsing the code, it is found that there are some useless reference in > the following file which need cleanup. > {{packages.scala}} > *ExternalCatalogTable > *arithmetic.scala > *array.scala > *ColumnStats -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6406) Cleanup useless import
[ https://issues.apache.org/jira/browse/FLINK-6406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6406: --- Description: When browsing the code, it is found that there are some useless reference in the following file which need cleanup. {{packages.scala}} {{ExternalCatalogTable}} {{arithmetic.scala}} {{array.scala}} {{ColumnStats}} was: When browsing the code, it is found that there are some useless reference in the following file which need cleanup. {{packages.scala}} *ExternalCatalogTable *arithmetic.scala *array.scala *ColumnStats > Cleanup useless import > --- > > Key: FLINK-6406 > URL: https://issues.apache.org/jira/browse/FLINK-6406 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When browsing the code, it is found that there are some useless reference in > the following file which need cleanup. > {{packages.scala}} > {{ExternalCatalogTable}} > {{arithmetic.scala}} > {{array.scala}} > {{ColumnStats}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6406) Cleanup useless import
sunjincheng created FLINK-6406: -- Summary: Cleanup useless import Key: FLINK-6406 URL: https://issues.apache.org/jira/browse/FLINK-6406 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.3.0 Reporter: sunjincheng Assignee: sunjincheng When browsing the code, it is found that there are some useless reference in the following file which need cleanup. *packages.scala *ExternalCatalogTable *arithmetic.scala *array.scala *ColumnStats -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988303#comment-15988303 ] ASF GitHub Bot commented on FLINK-6196: --- Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r113861371 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala --- @@ -117,6 +117,187 @@ class DataSetUserDefinedFunctionITCase( } @Test + def testDynamicSchema(): Unit = { --- End diff -- Yes, I agree with you. I've updated the consistently patch. I will refine these tests very soon today. > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > Fix For: 1.3.0 > > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r113861371 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala --- @@ -117,6 +117,187 @@ class DataSetUserDefinedFunctionITCase( } @Test + def testDynamicSchema(): Unit = { --- End diff -- Yes, I agree with you. I've updated the consistently patch. I will refine these tests very soon today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988284#comment-15988284 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3791 HI @Xpray Thanks for the update. I think the PR is in a pretty good shape for me. Except for the name of `TableFunctionConversions`. looking forward to @fhueske 's opinion. Best, SunJincheng > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3791 HI @Xpray Thanks for the update. I think the PR is in a pretty good shape for me. Except for the name of `TableFunctionConversions`. looking forward to @fhueske 's opinion. Best, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988279#comment-15988279 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113858925 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions[T](tf: TableFunction[T]) { + --- End diff -- In fact, I think it's a `Type` not a `Conversion`. I'am not sure. I think @fhueske can give us bestest suggestion. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113858925 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions[T](tf: TableFunction[T]) { + --- End diff -- In fact, I think it's a `Type` not a `Conversion`. I'am not sure. I think @fhueske can give us bestest suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6397) MultipleProgramsTestBase does not reset ContextEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6397: Description: The MultipleProgramsTestBase sets a new TestEnvironment as a context environment but never explicitly unsets it, which can result subsequent tests categorically failing. The CustomDistributionITCase doesn't unset the context either; and some streaming test that i haven't quite nailed down yet. was: The MultipleProgramsTestBase sets a new TestEnvironment as a context environment but never explicitly unsets it, which can result subsequent tests categorically failing. The ClusterDistributionITCase doesn't unset the context either; and some streaming test that i haven't quite nailed down yet. > MultipleProgramsTestBase does not reset ContextEnvironment > -- > > Key: FLINK-6397 > URL: https://issues.apache.org/jira/browse/FLINK-6397 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Assignee: Biao Liu > > The MultipleProgramsTestBase sets a new TestEnvironment as a context > environment but never explicitly unsets it, which can result subsequent tests > categorically failing. > The CustomDistributionITCase doesn't unset the context either; and some > streaming test that i haven't quite nailed down yet. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6397) MultipleProgramsTestBase does not reset ContextEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988264#comment-15988264 ] Chesnay Schepler commented on FLINK-6397: - It's called {CustomDistributionITCase}, fixing the description now. > MultipleProgramsTestBase does not reset ContextEnvironment > -- > > Key: FLINK-6397 > URL: https://issues.apache.org/jira/browse/FLINK-6397 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Assignee: Biao Liu > > The MultipleProgramsTestBase sets a new TestEnvironment as a context > environment but never explicitly unsets it, which can result subsequent tests > categorically failing. > The ClusterDistributionITCase doesn't unset the context either; and some > streaming test that i haven't quite nailed down yet. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988248#comment-15988248 ] ASF GitHub Bot commented on FLINK-6334: --- Github user Xpray commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113855406 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions[T](tf: TableFunction[T]) { + --- End diff -- I found that most of the existing implicit convert functions have common postfix like XXXConversions, to which I think this naming may be clear. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user Xpray commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113855406 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions[T](tf: TableFunction[T]) { + --- End diff -- I found that most of the existing implicit convert functions have common postfix like XXXConversions, to which I think this naming may be clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6337) Remove the buffer provider from PartitionRequestServerHandler
[ https://issues.apache.org/jira/browse/FLINK-6337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988243#comment-15988243 ] ASF GitHub Bot commented on FLINK-6337: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3785 Hi @uce , I have submitted the modifications of `createReadView`, and the tests and IT have passed in my private travis. I checked the failed test `HistoryServerTest.testFullArchiveLifecycle` on this travis and it seems no related to my pull request. I run it several times separately again and all are passed. > Remove the buffer provider from PartitionRequestServerHandler > - > > Key: FLINK-6337 > URL: https://issues.apache.org/jira/browse/FLINK-6337 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently, {{PartitionRequestServerHandler}} will create a > {{LocalBufferPool}} when the channel is registered. The {{LocalBufferPool}} > is only used to get segment size for creating read view in > {{SpillableSubpartition}}, and the buffers in the pool will not be used all > the time, so it will waste the buffer resource of global pool. > We would like to remove the {{LocalBufferPool}} from the > {{PartitionRequestServerHandler}}, and the {{LocalBufferPool}} in > {{ResultPartition}} can also provide the segment size for creating sub > partition view. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3785: [FLINK-6337][network] Remove the buffer provider from Par...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3785 Hi @uce , I have submitted the modifications of `createReadView`, and the tests and IT have passed in my private travis. I checked the failed test `HistoryServerTest.testFullArchiveLifecycle` on this travis and it seems no related to my pull request. I run it several times separately again and all are passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988233#comment-15988233 ] ASF GitHub Bot commented on FLINK-5752: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 @fhueske Thanks for the update. Got it. But I would like to say that if there any issues/JIRA that I could be of help for the 1.3 release fork, I would happy to help. Pls point me to those you think I can be of help, I can have a look and commit to what ever I can spend time on. > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > Once FLINK-5698 is implemented then we should be making use of the feature to > push down the projections for a nested table. So in case of HBase if we have > {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query > that needs to select f2.c - then we should be specifically able to project > only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such > projections and HBaseTableSource should make use of that API to do the > projection. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 @fhueske Thanks for the update. Got it. But I would like to say that if there any issues/JIRA that I could be of help for the 1.3 release fork, I would happy to help. Pls point me to those you think I can be of help, I can have a look and commit to what ever I can spend time on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988165#comment-15988165 ] ASF GitHub Bot commented on FLINK-6393: --- Github user fanzhidongyzby commented on a diff in the pull request: https://github.com/apache/flink/pull/3788#discussion_r113848435 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +/** + * Evenly graph means every {@link Vertex} in the {@link Graph} has the same degree. + * when vertex degree is 0, {@link EmptyGraph} will be generated. + * when vertex degree is vertex count - 1, {@link CompleteGraph} will be generated. --- End diff -- "vertex count - 1" means the degree of every vertex when evenly graph is complete graph, is this clear? > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3788: [FLINK-6393] [gelly] Add Evenly Graph Generator to...
Github user fanzhidongyzby commented on a diff in the pull request: https://github.com/apache/flink/pull/3788#discussion_r113848435 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +/** + * Evenly graph means every {@link Vertex} in the {@link Graph} has the same degree. + * when vertex degree is 0, {@link EmptyGraph} will be generated. + * when vertex degree is vertex count - 1, {@link CompleteGraph} will be generated. --- End diff -- "vertex count - 1" means the degree of every vertex when evenly graph is complete graph, is this clear? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5106) improving IncrementalAggregateReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-5106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-5106. -- Resolution: Won't Fix Currently, we had applied the new UDAF framework, which this JIRA. does not need fix. > improving IncrementalAggregateReduceFunction > > > Key: FLINK-5106 > URL: https://issues.apache.org/jira/browse/FLINK-5106 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Please refer to FLINK-4937. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988120#comment-15988120 ] sunjincheng commented on FLINK-6097: Yes, It's works well without order. close this JIRA. > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > When we try to implement `OVER window` TableAPI, The first version of the > prototype to achieve,we do not consider the table field will be out of order > when we implement `translateToPlan` method,then we set `outputRow` field > from `inputRow` according to the Initial order of the table field index. > At the beginning, the projections in the select statement less than 5 columns > It works well.But Unfortunately when the count of projections bigger than 4 > (>=5), we got the random result. Then we debug the code, we find that > `ProjectionTranslator # identifyFieldReferences` method uses the` Set` > temporary save field, when the number of elements in the Set is less than 5, > the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of > elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet > and which will cause the data to be out of order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > // we want (a, b, c, d, e) > Set (e, a, b, c, d) > Class scala.collection.immutable.HashSet $ HashTrieSet > {code} > So we thought 2 approach to solve this problem: > 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the > order of the extracted field references same as input order. > 2. We add the input and output field mapping. > At last we using approach#2 solve the problem. This change is not necessary > for the problem i have faced. But I feel it is better to let the output of > this method in the same order as the input, it may be very helpful for other > cases, though I am currently not aware of any. I am ok with not making this > change, but we should add a comment instead to highlight that the potential > output of the current output. Otherwise, some people may not pay attention to > this and assume it is in order. > Hi, guys, What do you think? Welcome any feedback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-6097. -- Resolution: Won't Fix > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > When we try to implement `OVER window` TableAPI, The first version of the > prototype to achieve,we do not consider the table field will be out of order > when we implement `translateToPlan` method,then we set `outputRow` field > from `inputRow` according to the Initial order of the table field index. > At the beginning, the projections in the select statement less than 5 columns > It works well.But Unfortunately when the count of projections bigger than 4 > (>=5), we got the random result. Then we debug the code, we find that > `ProjectionTranslator # identifyFieldReferences` method uses the` Set` > temporary save field, when the number of elements in the Set is less than 5, > the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of > elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet > and which will cause the data to be out of order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > // we want (a, b, c, d, e) > Set (e, a, b, c, d) > Class scala.collection.immutable.HashSet $ HashTrieSet > {code} > So we thought 2 approach to solve this problem: > 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the > order of the extracted field references same as input order. > 2. We add the input and output field mapping. > At last we using approach#2 solve the problem. This change is not necessary > for the problem i have faced. But I feel it is better to let the output of > this method in the same order as the input, it may be very helpful for other > cases, though I am currently not aware of any. I am ok with not making this > change, but we should add a comment instead to highlight that the potential > output of the current output. Otherwise, some people may not pay attention to > this and assume it is in order. > Hi, guys, What do you think? Welcome any feedback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988109#comment-15988109 ] ASF GitHub Bot commented on FLINK-6393: --- Github user gallenvara commented on the issue: https://github.com/apache/flink/pull/3788 @fanzhidongyzby , thanks for the pr. Just a minor comment, mostly looks good to me. > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3788: [FLINK-6393] [gelly] Add Evenly Graph Generator to Flink ...
Github user gallenvara commented on the issue: https://github.com/apache/flink/pull/3788 @fanzhidongyzby , thanks for the pr. Just a minor comment, mostly looks good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988108#comment-15988108 ] ASF GitHub Bot commented on FLINK-6393: --- Github user gallenvara commented on a diff in the pull request: https://github.com/apache/flink/pull/3788#discussion_r113842976 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +/** + * Evenly graph means every {@link Vertex} in the {@link Graph} has the same degree. + * when vertex degree is 0, {@link EmptyGraph} will be generated. + * when vertex degree is vertex count - 1, {@link CompleteGraph} will be generated. --- End diff -- `vertex count - 1` is confused. Can you change the description more clearly? > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3788: [FLINK-6393] [gelly] Add Evenly Graph Generator to...
Github user gallenvara commented on a diff in the pull request: https://github.com/apache/flink/pull/3788#discussion_r113842976 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +/** + * Evenly graph means every {@link Vertex} in the {@link Graph} has the same degree. + * when vertex degree is 0, {@link EmptyGraph} will be generated. + * when vertex degree is vertex count - 1, {@link CompleteGraph} will be generated. --- End diff -- `vertex count - 1` is confused. Can you change the description more clearlyï¼ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Reopened] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] FlorianFan reopened FLINK-6393: --- reset issue state > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988090#comment-15988090 ] FlorianFan commented on FLINK-6393: --- Hi [~greghogan], EvenlyGraph is a general graph generator, which can be used by any graph algorithm which hopes the testing graph's vertices and edges is evenly distributed. After reading the detail of CirculantGraph, I found EvenlyGraph is the subset of it, just like EmptyGraph or CompleteGraph is the subset of EvenlyGraph. I didn't know about CirculantGraph before, the concept of EvenlyGraph came from the testing dataset which is used by performance testing of graph algorithm in production environment. not specific algorithms use EvenlyGraph, but it's an important type of graph dataset which can help us to find the performance of any algorithm we care about. Therefore, I think EvenlyGraph generator can enrich the gelly graph library, and provide a wider scope of graph testing. In addition, EvenlyGraph may be a special case of CirculantGraph, just like EmptyGraph or CompleteGraph, which is simplified and has brief and fast implementation of generating algorithm. > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6397) MultipleProgramsTestBase does not reset ContextEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988079#comment-15988079 ] Biao Liu commented on FLINK-6397: - I would like to fix this problem. But I didn't find the {{ClusterDistributionITCase}} in master branch [~Zentol] > MultipleProgramsTestBase does not reset ContextEnvironment > -- > > Key: FLINK-6397 > URL: https://issues.apache.org/jira/browse/FLINK-6397 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Assignee: Biao Liu > > The MultipleProgramsTestBase sets a new TestEnvironment as a context > environment but never explicitly unsets it, which can result subsequent tests > categorically failing. > The ClusterDistributionITCase doesn't unset the context either; and some > streaming test that i haven't quite nailed down yet. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6397) MultipleProgramsTestBase does not reset ContextEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu reassigned FLINK-6397: --- Assignee: Biao Liu > MultipleProgramsTestBase does not reset ContextEnvironment > -- > > Key: FLINK-6397 > URL: https://issues.apache.org/jira/browse/FLINK-6397 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Assignee: Biao Liu > > The MultipleProgramsTestBase sets a new TestEnvironment as a context > environment but never explicitly unsets it, which can result subsequent tests > categorically failing. > The ClusterDistributionITCase doesn't unset the context either; and some > streaming test that i haven't quite nailed down yet. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988040#comment-15988040 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113839137 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.table.api._ +import org.apache.flink.table.expressions.ExpressionParser --- End diff -- Please remove this line. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988041#comment-15988041 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113839148 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.table.expressions.ExpressionParser --- End diff -- Please restore this line. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113839148 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.table.expressions.ExpressionParser --- End diff -- Please restore this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113839137 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.table.api._ +import org.apache.flink.table.expressions.ExpressionParser --- End diff -- Please remove this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3794: [FLINK-6398] RowSerializer's duplicate should always retu...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3794 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988012#comment-15988012 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113834406 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -19,11 +19,12 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.table.api._ -import org.apache.flink.table.functions.TableFunction -import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api._ +import org.apache.flink.table.api.scala.TableFunctionConversions --- End diff -- Please remove the useless import. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988013#comment-15988013 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113833504 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -107,6 +107,11 @@ abstract class TableEnvironment(val config: TableConfig) { // registered external catalog names -> catalog private val externalCatalogs = new HashMap[String, ExternalCatalog] + private lazy val tableFunctionParser = new TableFunctionParser(this) + + // the method for converting a udtf String to Table for Java API + final def tableApply(udtf: String): Table = tableFunctionParser(udtf) --- End diff -- `TableFunctionParser` only has one method named `apply`. IMO. It's a util method. So here are 3 suggestions: * If a class only contains util methods, I suggest change `class` to `object`, And tableEnv can be a param of method. * If `TableFunctionParser#apply` only used for `TableFunction`, I suggest move `apply` method into `UserDefinedFunctionUtils` ,Because all of the functional methods of `UDF/UDTF/UDAF` in that file. * if the method only used for `TableEnvironment`.Whether it can be implemented internally in `TableEnvironment` ? What do you think? @Xpray > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113835656 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions[T](tf: TableFunction[T]) { + --- End diff -- I think before apply the `TableFunction` ,It's just a definition. And when it's applied. It's a table. So I like named `AppliedTableFunction`. So we have two step a bout using `UDTF`. that is: define -> apply -> join. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988011#comment-15988011 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113835656 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions[T](tf: TableFunction[T]) { + --- End diff -- I think before apply the `TableFunction` ,It's just a definition. And when it's applied. It's a table. So I like named `AppliedTableFunction`. So we have two step a bout using `UDTF`. that is: define -> apply -> join. What do you think? > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988010#comment-15988010 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113833924 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableFunctionParser.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + + +/** + * A parser to convert a udtf String (for Java user) to [[Table]] + * + * @param tableEnv a [[TableEnvironment]] which is used for looking up a function + */ +class TableFunctionParser(tableEnv: TableEnvironment) { + --- End diff -- Only contains util method. I suggest change it to `Object`. or move the method into `UserDefinedFunctionUtils`. And `tableEnv` can be a param of method. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113834406 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala --- @@ -19,11 +19,12 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.table.api._ -import org.apache.flink.table.functions.TableFunction -import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api._ +import org.apache.flink.table.api.scala.TableFunctionConversions --- End diff -- Please remove the useless import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988009#comment-15988009 ] ASF GitHub Bot commented on FLINK-6334: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113836005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment --- End diff -- // check that the TableEnvironment of right table is not null and right table belongs to the same TableEnvironment > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113836005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment --- End diff -- // check that the TableEnvironment of right table is not null and right table belongs to the same TableEnvironment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113833504 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -107,6 +107,11 @@ abstract class TableEnvironment(val config: TableConfig) { // registered external catalog names -> catalog private val externalCatalogs = new HashMap[String, ExternalCatalog] + private lazy val tableFunctionParser = new TableFunctionParser(this) + + // the method for converting a udtf String to Table for Java API + final def tableApply(udtf: String): Table = tableFunctionParser(udtf) --- End diff -- `TableFunctionParser` only has one method named `apply`. IMO. It's a util method. So here are 3 suggestions: * If a class only contains util methods, I suggest change `class` to `object`, And tableEnv can be a param of method. * If `TableFunctionParser#apply` only used for `TableFunction`, I suggest move `apply` method into `UserDefinedFunctionUtils` ,Because all of the functional methods of `UDF/UDTF/UDAF` in that file. * if the method only used for `TableEnvironment`.Whether it can be implemented internally in `TableEnvironment` ï¼ What do you think? @Xpray --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6398) RowSerializer's duplicate should always return a new instance
[ https://issues.apache.org/jira/browse/FLINK-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988008#comment-15988008 ] ASF GitHub Bot commented on FLINK-6398: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3794 +1 to merge > RowSerializer's duplicate should always return a new instance > - > > Key: FLINK-6398 > URL: https://issues.apache.org/jira/browse/FLINK-6398 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0, 1.2.1 >Reporter: Kurt Young >Assignee: Kurt Young > > RowSerializer is stateful because of {{nullMask}}, we should always return a > new instance when duplicating. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113833924 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableFunctionParser.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + + +/** + * A parser to convert a udtf String (for Java user) to [[Table]] + * + * @param tableEnv a [[TableEnvironment]] which is used for looking up a function + */ +class TableFunctionParser(tableEnv: TableEnvironment) { + --- End diff -- Only contains util method. I suggest change it to `Object`. or move the method into `UserDefinedFunctionUtils`. And `tableEnv` can be a param of method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhuoluo Yang updated FLINK-6196: Fix Version/s: 1.3.0 > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > Fix For: 1.3.0 > > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6394) GroupCombine reuses instances even though object reuse is disabled
[ https://issues.apache.org/jira/browse/FLINK-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987991#comment-15987991 ] Kurt Young commented on FLINK-6394: --- Hi [~vanekjar], thanks for reporting this, i will take a look at it soon. > GroupCombine reuses instances even though object reuse is disabled > -- > > Key: FLINK-6394 > URL: https://issues.apache.org/jira/browse/FLINK-6394 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0 >Reporter: Jaromir Vanek >Priority: Critical > > I am using group combiner in DataSet API with disabled object reuse. > In code it may be expressed as follows: > {code:java} > tuples.groupBy(1) > .combineGroup((it, collector) -> { > // store first item for future use > Pojo stored = it.next(); > while (it.hasNext()) { > > } > }) > {code} > It seems even the object reuse feature is disabled, my instance is actually > replaced when {{.next()}} is called on the iterator. It leads to very > confusing and wrong results. > I checked the Flink codebase and it seems {{CombiningUnilateralSortMerger}} > is actually reusing object instances even though object reuse is explicitly > disabled. > In spilling phase user's combiner is called with instance of > {{CombineValueIterator}} that actually reuses instances without any warning. > See > https://github.com/apache/flink/blob/d7b59d761601baba6765bb4fc407bcd9fd6a9387/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java#L550 > When I disable combiner and use {{groupReduce}} only with the same reduce > function, results are fine. > Please let me know if you can confirm this as a bug. From my point of view > it's highly critical as I am getting unpredictable results. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6394) GroupCombine reuses instances even though object reuse is disabled
[ https://issues.apache.org/jira/browse/FLINK-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reassigned FLINK-6394: - Assignee: Kurt Young > GroupCombine reuses instances even though object reuse is disabled > -- > > Key: FLINK-6394 > URL: https://issues.apache.org/jira/browse/FLINK-6394 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0 >Reporter: Jaromir Vanek >Assignee: Kurt Young >Priority: Critical > > I am using group combiner in DataSet API with disabled object reuse. > In code it may be expressed as follows: > {code:java} > tuples.groupBy(1) > .combineGroup((it, collector) -> { > // store first item for future use > Pojo stored = it.next(); > while (it.hasNext()) { > > } > }) > {code} > It seems even the object reuse feature is disabled, my instance is actually > replaced when {{.next()}} is called on the iterator. It leads to very > confusing and wrong results. > I checked the Flink codebase and it seems {{CombiningUnilateralSortMerger}} > is actually reusing object instances even though object reuse is explicitly > disabled. > In spilling phase user's combiner is called with instance of > {{CombineValueIterator}} that actually reuses instances without any warning. > See > https://github.com/apache/flink/blob/d7b59d761601baba6765bb4fc407bcd9fd6a9387/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java#L550 > When I disable combiner and use {{groupReduce}} only with the same reduce > function, results are fine. > Please let me know if you can confirm this as a bug. From my point of view > it's highly critical as I am getting unpredictable results. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987983#comment-15987983 ] Greg Hogan commented on FLINK-6393: --- Hi [~fanzhidongyzby], we'll leave this Jira open until the pull request is merged into the repository. I looked for a description of this graph early today and the closest I could find was [CirculantGraph|http://mathworld.wolfram.com/CirculantGraph.html], which is a generalization. What algorithms would use the EvenlyGraph? > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987975#comment-15987975 ] FlorianFan commented on FLINK-6393: --- [~greghogan] Could this pr be merged into master branch? > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-6393) Add Evenly Graph Generator to Flink Gelly
[ https://issues.apache.org/jira/browse/FLINK-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] FlorianFan resolved FLINK-6393. --- Resolution: Fixed pull request sent > Add Evenly Graph Generator to Flink Gelly > - > > Key: FLINK-6393 > URL: https://issues.apache.org/jira/browse/FLINK-6393 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: FlorianFan >Assignee: FlorianFan >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Evenly graph means every vertex in the graph has the same degree, so the > graph can be treated as evenly due to all the edges in the graph are > distributed evenly. when vertex degree is 0, an empty graph will be > generated. when vertex degree is vertex count - 1, complete graph will be > generated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6398) RowSerializer's duplicate should always return a new instance
[ https://issues.apache.org/jira/browse/FLINK-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987956#comment-15987956 ] ASF GitHub Bot commented on FLINK-6398: --- GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3794 [FLINK-6398] RowSerializer's duplicate should always return a new instance You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-6398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3794.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3794 commit 29f328493dce3adef828f3a93be171a57f1316b0 Author: Kurt Young Date: 2017-04-27T15:37:21Z [FLINK-6398] RowSerializer's duplicate should always return a new instance > RowSerializer's duplicate should always return a new instance > - > > Key: FLINK-6398 > URL: https://issues.apache.org/jira/browse/FLINK-6398 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0, 1.2.1 >Reporter: Kurt Young >Assignee: Kurt Young > > RowSerializer is stateful because of {{nullMask}}, we should always return a > new instance when duplicating. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3794: [FLINK-6398] RowSerializer's duplicate should alwa...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3794 [FLINK-6398] RowSerializer's duplicate should always return a new instance You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-6398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3794.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3794 commit 29f328493dce3adef828f3a93be171a57f1316b0 Author: Kurt Young Date: 2017-04-27T15:37:21Z [FLINK-6398] RowSerializer's duplicate should always return a new instance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3793: flink-6033 Support UNNEST query in the stream SQL ...
GitHub user suez1224 opened a pull request: https://github.com/apache/flink/pull/3793 flink-6033 Support UNNEST query in the stream SQL API â¦y is supported, and WITH ORDINALITY is not yet supported. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink unnest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3793.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3793 commit 2c1520cf65632fd47350ac9f0b8b632ad4778b89 Author: Shuyi Chen Date: 2017-04-22T06:48:28Z Add support for UNNEST support in streaming SQL. Currently, only array is supported, and WITH ORDINALITY is not yet supported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3787: flink-6033 Support UNNEST query in the stream SQL ...
Github user suez1224 closed the pull request at: https://github.com/apache/flink/pull/3787 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4953) Allow access to "time" in ProcessWindowFunction.Context
[ https://issues.apache.org/jira/browse/FLINK-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987778#comment-15987778 ] ASF GitHub Bot commented on FLINK-4953: --- Github user manuzhang closed the pull request at: https://github.com/apache/flink/pull/3661 > Allow access to "time" in ProcessWindowFunction.Context > --- > > Key: FLINK-4953 > URL: https://issues.apache.org/jira/browse/FLINK-4953 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Manu Zhang >Assignee: Manu Zhang > Fix For: 1.3.0 > > > The recently added {{ProcessWindowFunction}} has a {{Context}} object that > allows querying some additional information about the window firing that we > are processing. Right now, this is only the window for which the firing is > happening. We should extends this with methods that allow querying the > current processing time and the current watermark. > Original text by issue creator: This is similar to FLINK-3674 but exposing > time information in window functions. Currently when a timer is fired, all > states in a window will be returned to users, including those after the > timer. This change will allow users to filter out states after the timer > based on time info. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3661: [FLINK-4953] Allow access to "time" in ProcessWind...
Github user manuzhang closed the pull request at: https://github.com/apache/flink/pull/3661 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6405) Flink should be able to load a jar from s3
Bowen Li created FLINK-6405: --- Summary: Flink should be able to load a jar from s3 Key: FLINK-6405 URL: https://issues.apache.org/jira/browse/FLINK-6405 Project: Flink Issue Type: Improvement Reporter: Bowen Li ./bin/flink only loads jars from the local path, not from S3. Therefore, we need an extra step to copy jars from s3 to our EMR master. The ./bin/flink script should instead be able read jar from S3 so that we don't need to copy the jar. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6400) Lack of protection accessing masterHooks in CheckpointCoordinator#triggerCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987730#comment-15987730 ] Ted Yu commented on FLINK-6400: --- Thanks for the quick action, Stephan > Lack of protection accessing masterHooks in > CheckpointCoordinator#triggerCheckpoint > --- > > Key: FLINK-6400 > URL: https://issues.apache.org/jira/browse/FLINK-6400 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.3.0 > > > Here is related code: > {code} > synchronized (triggerLock) { > ... > try { > List masterStates = > MasterHooks.triggerMasterHooks(masterHooks.values(), > {code} > masterHooks is protected by lock in other methods while triggerLock is only > used in CheckpointCoordinator#triggerCheckpoint() -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-6404) Ensure PendingCheckpoint is registered when calling Checkpoint Hooks
[ https://issues.apache.org/jira/browse/FLINK-6404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-6404. - Resolution: Fixed Fixed in fcb13e1f54cc8d634416b41d5fc41518806a1885 > Ensure PendingCheckpoint is registered when calling Checkpoint Hooks > > > Key: FLINK-6404 > URL: https://issues.apache.org/jira/browse/FLINK-6404 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-6400) Lack of protection accessing masterHooks in CheckpointCoordinator#triggerCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-6400. - Resolution: Fixed Assignee: Stephan Ewen Fix Version/s: 1.3.0 Fixed in fcb13e1f54cc8d634416b41d5fc41518806a1885 > Lack of protection accessing masterHooks in > CheckpointCoordinator#triggerCheckpoint > --- > > Key: FLINK-6400 > URL: https://issues.apache.org/jira/browse/FLINK-6400 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.3.0 > > > Here is related code: > {code} > synchronized (triggerLock) { > ... > try { > List masterStates = > MasterHooks.triggerMasterHooks(masterHooks.values(), > {code} > masterHooks is protected by lock in other methods while triggerLock is only > used in CheckpointCoordinator#triggerCheckpoint() -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6400) Lack of protection accessing masterHooks in CheckpointCoordinator#triggerCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-6400. --- > Lack of protection accessing masterHooks in > CheckpointCoordinator#triggerCheckpoint > --- > > Key: FLINK-6400 > URL: https://issues.apache.org/jira/browse/FLINK-6400 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.3.0 > > > Here is related code: > {code} > synchronized (triggerLock) { > ... > try { > List masterStates = > MasterHooks.triggerMasterHooks(masterHooks.values(), > {code} > masterHooks is protected by lock in other methods while triggerLock is only > used in CheckpointCoordinator#triggerCheckpoint() -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6404) Ensure PendingCheckpoint is registered when calling Checkpoint Hooks
[ https://issues.apache.org/jira/browse/FLINK-6404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-6404. --- > Ensure PendingCheckpoint is registered when calling Checkpoint Hooks > > > Key: FLINK-6404 > URL: https://issues.apache.org/jira/browse/FLINK-6404 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6373) Add runtime support for distinct aggregation over grouped windows
[ https://issues.apache.org/jira/browse/FLINK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987709#comment-15987709 ] ASF GitHub Bot commented on FLINK-6373: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3765 Updated the PR to codegen the parts used by distinct accumulator. Each column is calculated independently. > Add runtime support for distinct aggregation over grouped windows > - > > Key: FLINK-6373 > URL: https://issues.apache.org/jira/browse/FLINK-6373 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > This is a follow up task for FLINK-6335. FLINK-6335 enables parsing the > distinct aggregations over grouped windows. This jira tracks the effort of > adding runtime support for the query. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3765: [FLINK-6373] Add runtime support for distinct aggregation...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3765 Updated the PR to codegen the parts used by distinct accumulator. Each column is calculated independently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987625#comment-15987625 ] ASF GitHub Bot commented on FLINK-6075: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3714 @fhueske I have committed also the support for rowtime order. Now the PR contains the complete implementation for the things discussed in the JIRA issue. offset and fetch support will be added when retraction is available (or merged in that branch) > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b) with the ordering being applied on the b field. > `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `) > ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort > [ASC]|| > | |10:00:00 |(aaa, 11) | | | >| > | |10:05:00|(aab, 7) | | || > |10-11 |11:00:00 | | aab,aaa |aab,aaa | aab,aaa >| > | |11:03:00 |(aac,21) | | || > > |11-12|12:00:00 | | aab,aaa |aab,aaa | aab,aaa,aac| > | |12:10:00 |(abb,12) | | || > > | |12:15:00 |(abb,12) | | || > > |12-13 |13:00:00 | | abb,abb | abb,abb | > abb,abb,aac| > |...| > **Implementation option** > Considering that the SQL operators will be associated with window boundaries, > the functionality will be implemented within the logic of the window as > follows. > * Window assigner – selected based on the type of window used in SQL > (TUMBLING, SLIDING…) > * Evictor/ Trigger – time or count evictor based on the definition of the > window boundaries > * Apply – window function that sorts data and selects the output to trigger > (based on LIMIT/TOP parameters). All data will be sorted at once and result > outputted when t
[GitHub] flink issue #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3714 @fhueske I have committed also the support for rowtime order. Now the PR contains the complete implementation for the things discussed in the JIRA issue. offset and fetch support will be added when retraction is available (or merged in that branch) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6404) Ensure PendingCheckpoint is registered when calling Checkpoint Hooks
Stephan Ewen created FLINK-6404: --- Summary: Ensure PendingCheckpoint is registered when calling Checkpoint Hooks Key: FLINK-6404 URL: https://issues.apache.org/jira/browse/FLINK-6404 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Minor Fix For: 1.3.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6400) Lack of protection accessing masterHooks in CheckpointCoordinator#triggerCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987605#comment-15987605 ] Stephan Ewen commented on FLINK-6400: - I think this is a theoretical issue more. The hooks are not manipulated after the coordinator is instantiated. Will patch this anyways, to make the code clear... > Lack of protection accessing masterHooks in > CheckpointCoordinator#triggerCheckpoint > --- > > Key: FLINK-6400 > URL: https://issues.apache.org/jira/browse/FLINK-6400 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is related code: > {code} > synchronized (triggerLock) { > ... > try { > List masterStates = > MasterHooks.triggerMasterHooks(masterHooks.values(), > {code} > masterHooks is protected by lock in other methods while triggerLock is only > used in CheckpointCoordinator#triggerCheckpoint() -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6403) constructFlinkClassPath produces nondeterministic classpath
[ https://issues.apache.org/jira/browse/FLINK-6403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987489#comment-15987489 ] Greg Hogan commented on FLINK-6403: --- [~arobe...@fuze.com] this is now fixed in the release-1.2 branch for the 1.2.2 release. > constructFlinkClassPath produces nondeterministic classpath > --- > > Key: FLINK-6403 > URL: https://issues.apache.org/jira/browse/FLINK-6403 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0 >Reporter: Andrew Roberts >Priority: Critical > > In 1.2.0, `config.sh` moved from a shell glob to a find-based approach for > constructing the classpath from `/lib` that gets sent to most flink commands, > e.g. `start-cluster.sh`. The `find` command does not guarantee an ordering, > and we saw issues with flink constructing different classpaths on different > machines. > constructFlinkClassPath should be modified to produce a deterministic > classpath. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6176) Add JARs to CLASSPATH deterministically
[ https://issues.apache.org/jira/browse/FLINK-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-6176. - Resolution: Fixed > Add JARs to CLASSPATH deterministically > --- > > Key: FLINK-6176 > URL: https://issues.apache.org/jira/browse/FLINK-6176 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Scott Kidder >Assignee: Greg Hogan > Fix For: 1.3.0, 1.2.2 > > > The {{config.sh}} script uses the following shell-script function to build > the {{FLINK_CLASSPATH}} variable from a listing of JAR files in the > {{$FLINK_LIB_DIR}} directory: > {code} > constructFlinkClassPath() { > while read -d '' -r jarfile ; do > if [[ $FLINK_CLASSPATH = "" ]]; then > FLINK_CLASSPATH="$jarfile"; > else > FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" > fi > done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) > echo $FLINK_CLASSPATH > } > {code} > The {{find}} command as specified will return files in directory-order, which > varies by OS and filesystem. > The inconsistent ordering of directory contents caused problems for me when > installing a Flink Docker image onto new machine with a newer version of > Docker and different filesystem (UFS). The differences in the Docker > filesystem implementation led to different ordering of the directory > contents; this affected the {{FLINK_CLASSPATH}} ordering and generated very > puzzling {{NoClassNotFoundException}} errors when running my Flink > application. > This should be addressed by deterministically ordering JAR files added to the > {{FLINK_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Reopened] (FLINK-6176) Add JARs to CLASSPATH deterministically
[ https://issues.apache.org/jira/browse/FLINK-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan reopened FLINK-6176: --- > Add JARs to CLASSPATH deterministically > --- > > Key: FLINK-6176 > URL: https://issues.apache.org/jira/browse/FLINK-6176 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Scott Kidder >Assignee: Greg Hogan > Fix For: 1.3.0, 1.2.2 > > > The {{config.sh}} script uses the following shell-script function to build > the {{FLINK_CLASSPATH}} variable from a listing of JAR files in the > {{$FLINK_LIB_DIR}} directory: > {code} > constructFlinkClassPath() { > while read -d '' -r jarfile ; do > if [[ $FLINK_CLASSPATH = "" ]]; then > FLINK_CLASSPATH="$jarfile"; > else > FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" > fi > done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) > echo $FLINK_CLASSPATH > } > {code} > The {{find}} command as specified will return files in directory-order, which > varies by OS and filesystem. > The inconsistent ordering of directory contents caused problems for me when > installing a Flink Docker image onto new machine with a newer version of > Docker and different filesystem (UFS). The differences in the Docker > filesystem implementation led to different ordering of the directory > contents; this affected the {{FLINK_CLASSPATH}} ordering and generated very > puzzling {{NoClassNotFoundException}} errors when running my Flink > application. > This should be addressed by deterministically ordering JAR files added to the > {{FLINK_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6176) Add JARs to CLASSPATH deterministically
[ https://issues.apache.org/jira/browse/FLINK-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-6176: -- Fix Version/s: 1.2.2 > Add JARs to CLASSPATH deterministically > --- > > Key: FLINK-6176 > URL: https://issues.apache.org/jira/browse/FLINK-6176 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Scott Kidder >Assignee: Greg Hogan > Fix For: 1.3.0, 1.2.2 > > > The {{config.sh}} script uses the following shell-script function to build > the {{FLINK_CLASSPATH}} variable from a listing of JAR files in the > {{$FLINK_LIB_DIR}} directory: > {code} > constructFlinkClassPath() { > while read -d '' -r jarfile ; do > if [[ $FLINK_CLASSPATH = "" ]]; then > FLINK_CLASSPATH="$jarfile"; > else > FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" > fi > done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) > echo $FLINK_CLASSPATH > } > {code} > The {{find}} command as specified will return files in directory-order, which > varies by OS and filesystem. > The inconsistent ordering of directory contents caused problems for me when > installing a Flink Docker image onto new machine with a newer version of > Docker and different filesystem (UFS). The differences in the Docker > filesystem implementation led to different ordering of the directory > contents; this affected the {{FLINK_CLASSPATH}} ordering and generated very > puzzling {{NoClassNotFoundException}} errors when running my Flink > application. > This should be addressed by deterministically ordering JAR files added to the > {{FLINK_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6176) Add JARs to CLASSPATH deterministically
[ https://issues.apache.org/jira/browse/FLINK-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-6176. - Resolution: Fixed Fixed for 1.2 in ef20aa1a154084429bfd685e94764f3fd8ea > Add JARs to CLASSPATH deterministically > --- > > Key: FLINK-6176 > URL: https://issues.apache.org/jira/browse/FLINK-6176 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Scott Kidder >Assignee: Greg Hogan > Fix For: 1.3.0 > > > The {{config.sh}} script uses the following shell-script function to build > the {{FLINK_CLASSPATH}} variable from a listing of JAR files in the > {{$FLINK_LIB_DIR}} directory: > {code} > constructFlinkClassPath() { > while read -d '' -r jarfile ; do > if [[ $FLINK_CLASSPATH = "" ]]; then > FLINK_CLASSPATH="$jarfile"; > else > FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" > fi > done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) > echo $FLINK_CLASSPATH > } > {code} > The {{find}} command as specified will return files in directory-order, which > varies by OS and filesystem. > The inconsistent ordering of directory contents caused problems for me when > installing a Flink Docker image onto new machine with a newer version of > Docker and different filesystem (UFS). The differences in the Docker > filesystem implementation led to different ordering of the directory > contents; this affected the {{FLINK_CLASSPATH}} ordering and generated very > puzzling {{NoClassNotFoundException}} errors when running my Flink > application. > This should be addressed by deterministically ordering JAR files added to the > {{FLINK_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Correct me if I'm wrong -- will something like the following work? ``` + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + outputFormat.flush(); + } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6281) Create TableSink for JDBC
[ https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987408#comment-15987408 ] ASF GitHub Bot commented on FLINK-6281: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Correct me if I'm wrong -- will something like the following work? ``` + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + outputFormat.flush(); + } ``` > Create TableSink for JDBC > - > > Key: FLINK-6281 > URL: https://issues.apache.org/jira/browse/FLINK-6281 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be nice to integrate the table APIs with the JDBC connectors so that > the rows in the tables can be directly pushed into JDBC. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6388) Add support for DISTINCT into Code Generated Aggregations
[ https://issues.apache.org/jira/browse/FLINK-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987370#comment-15987370 ] ASF GitHub Bot commented on FLINK-6388: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske @stefanobortoli Regarding the options for solving the distinct. From my point of view the previous approach worked: - we keep in the processFunctions a state for each field that is used in a distinct aggregator - we count the occurrence of each value (meant for a distinct aggregate) that we observed - when a value is seen for the first time we accumulate it - when a value is retracted we decrease the corresponding count. -if count is 0 we retract the value from accumulator Based on how things are implemented now - this would involved to have a separate list of aggregatefunctions for the distinct. In order to be able to control when to accumulate to these values. What do you think? Do you see any disadvantage to this? > Add support for DISTINCT into Code Generated Aggregations > - > > Key: FLINK-6388 > URL: https://issues.apache.org/jira/browse/FLINK-6388 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli > Fix For: 1.3.0 > > > We should support DISTINCT in Code Generated aggrgation functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske @stefanobortoli Regarding the options for solving the distinct. From my point of view the previous approach worked: - we keep in the processFunctions a state for each field that is used in a distinct aggregator - we count the occurrence of each value (meant for a distinct aggregate) that we observed - when a value is seen for the first time we accumulate it - when a value is retracted we decrease the corresponding count. -if count is 0 we retract the value from accumulator Based on how things are implemented now - this would involved to have a separate list of aggregatefunctions for the distinct. In order to be able to control when to accumulate to these values. What do you think? Do you see any disadvantage to this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987367#comment-15987367 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113781674 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -190,10 +192,13 @@ private CassandraSink(SingleOutputStreamOperator sink) { * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - public static CassandraSinkBuilder addSink(DataStream input) { + public static CassandraSinkBuilder addSink(DataStream input) { --- End diff -- Yes we can probably remove them. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113781674 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -190,10 +192,13 @@ private CassandraSink(SingleOutputStreamOperator sink) { * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - public static CassandraSinkBuilder addSink(DataStream input) { + public static CassandraSinkBuilder addSink(DataStream input) { --- End diff -- Yes we can probably remove them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske @stefanobortoli I recently fixed in Calcite the problem of porting distinct flag to the the over. This was merged in the master. Hence it is a matter of when flink will get the new calcite version. We can also consider the temporary solution IMHO until then --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6388) Add support for DISTINCT into Code Generated Aggregations
[ https://issues.apache.org/jira/browse/FLINK-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987352#comment-15987352 ] ASF GitHub Bot commented on FLINK-6388: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske @stefanobortoli I recently fixed in Calcite the problem of porting distinct flag to the the over. This was merged in the master. Hence it is a matter of when flink will get the new calcite version. We can also consider the temporary solution IMHO until then > Add support for DISTINCT into Code Generated Aggregations > - > > Key: FLINK-6388 > URL: https://issues.apache.org/jira/browse/FLINK-6388 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli > Fix For: 1.3.0 > > > We should support DISTINCT in Code Generated aggrgation functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987328#comment-15987328 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113747908 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -190,10 +192,13 @@ private CassandraSink(SingleOutputStreamOperator sink) { * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - public static CassandraSinkBuilder addSink(DataStream input) { + public static CassandraSinkBuilder addSink(DataStream input) { --- End diff -- do we need `T` and `R`? Couldn't these be removed if we cast to `DataStream` and `DataStream`? > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987331#comment-15987331 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113779310 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -438,6 +461,27 @@ public void cancel() { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, INSERT_DATA_QUERY, new Properties()); + cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); --- End diff -- Yes. It would also be good to test the TableSink in an actual Table API program. Most of the methods like `configure()` and `emitDataStream()` are internally called by the API. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987327#comment-15987327 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113778001 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link StreamTableSink}. + * + */ +class CassandraTableSink implements StreamTableSink { + private final ClusterBuilder builder; + private final String cql; + private String[] fieldNames; + private TypeInformation[] fieldTypes; + private final Properties properties; + + public CassandraTableSink(ClusterBuilder builder, String cql, Properties properties) { + this.builder = Preconditions.checkNotNull(builder, "builder"); + this.cql = Preconditions.checkNotNull(cql, "cql"); + this.properties = Preconditions.checkNotNull(properties, "properties"); + } + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(fieldTypes); + } + + @Override + public String[] getFieldNames() { + return this.fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return this.fieldTypes; + } + + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + CassandraTableSink cassandraTableSink = new CassandraTableSink(this.builder, this.cql, this.properties); + cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); --- End diff -- Better error messages > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987329#comment-15987329 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113776064 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -335,6 +340,30 @@ protected void sanityCheck() { } } + public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { + public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query == null || query.length() == 0) { + throw new IllegalArgumentException("Query must not be null or empty."); + } + } + + @Override + public CassandraSink build() throws Exception { + sanityCheck(); + if (isWriteAheadLogEnabled) { + throw new UnsupportedOperationException(); --- End diff -- Please add an error message here. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113776064 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -335,6 +340,30 @@ protected void sanityCheck() { } } + public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { + public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query == null || query.length() == 0) { + throw new IllegalArgumentException("Query must not be null or empty."); + } + } + + @Override + public CassandraSink build() throws Exception { + sanityCheck(); + if (isWriteAheadLogEnabled) { + throw new UnsupportedOperationException(); --- End diff -- Please add an error message here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987330#comment-15987330 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113745320 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,10 @@ under the License. + + org.apache.flink + flink-table_2.10 + 1.3-SNAPSHOT --- End diff -- The version should not be hard-coded. Use `${project.version}` instead. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113745320 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,10 @@ under the License. + + org.apache.flink + flink-table_2.10 + 1.3-SNAPSHOT --- End diff -- The version should not be hard-coded. Use `${project.version}` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113747908 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -190,10 +192,13 @@ private CassandraSink(SingleOutputStreamOperator sink) { * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - public static CassandraSinkBuilder addSink(DataStream input) { + public static CassandraSinkBuilder addSink(DataStream input) { --- End diff -- do we need `T` and `R`? Couldn't these be removed if we cast to `DataStream` and `DataStream`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113779310 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -438,6 +461,27 @@ public void cancel() { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, INSERT_DATA_QUERY, new Properties()); + cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); --- End diff -- Yes. It would also be good to test the TableSink in an actual Table API program. Most of the methods like `configure()` and `emitDataStream()` are internally called by the API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113778001 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link StreamTableSink}. + * + */ +class CassandraTableSink implements StreamTableSink { + private final ClusterBuilder builder; + private final String cql; + private String[] fieldNames; + private TypeInformation[] fieldTypes; + private final Properties properties; + + public CassandraTableSink(ClusterBuilder builder, String cql, Properties properties) { + this.builder = Preconditions.checkNotNull(builder, "builder"); + this.cql = Preconditions.checkNotNull(cql, "cql"); + this.properties = Preconditions.checkNotNull(properties, "properties"); + } + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(fieldTypes); + } + + @Override + public String[] getFieldNames() { + return this.fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return this.fieldTypes; + } + + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + CassandraTableSink cassandraTableSink = new CassandraTableSink(this.builder, this.cql, this.properties); + cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); --- End diff -- Better error messages --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6336) Placement Constraints for Mesos
[ https://issues.apache.org/jira/browse/FLINK-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987237#comment-15987237 ] ASF GitHub Bot commented on FLINK-6336: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3744 Thanks for your contribution @sgran and the review @EronWright. Changes look good to me. Merging this PR. > Placement Constraints for Mesos > --- > > Key: FLINK-6336 > URL: https://issues.apache.org/jira/browse/FLINK-6336 > Project: Flink > Issue Type: New Feature > Components: Mesos >Affects Versions: 1.2.0 >Reporter: Stephen Gran >Priority: Minor > > Fenzo supports placement constraints for tasks, and operators expose agent > attributes to frameworks in the form of attributes about the agent offer. > It would be extremely helpful in our multi-tenant cluster to be able to make > use of this facility. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3744: [FLINK-6336] Initial commit of mesos placement constraint...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3744 Thanks for your contribution @sgran and the review @EronWright. Changes look good to me. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3481: [FLINK-5975] Add volume support to flink-mesos
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3481 Changes look good to me. Will rebase the PR and then merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5975) Mesos should support adding volumes to launched taskManagers
[ https://issues.apache.org/jira/browse/FLINK-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987199#comment-15987199 ] ASF GitHub Bot commented on FLINK-5975: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3481 Changes look good to me. Will rebase the PR and then merge it. > Mesos should support adding volumes to launched taskManagers > > > Key: FLINK-5975 > URL: https://issues.apache.org/jira/browse/FLINK-5975 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Addison Higham >Assignee: Addison Higham >Priority: Minor > > Flink needs access to shared storage. > In many cases, this is HDFS, but it would be nice to also support file URIs > on an mounted NFS for example. > Mesos exposes APIs for adding volumes, so it should be relatively simply to > add this. > As an example, here is the spark code for supporting volumes: > https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala#L35 > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5810) Harden SlotManager
[ https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15987152#comment-15987152 ] ASF GitHub Bot commented on FLINK-5810: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3394 Incorporated your feedback @StephanEwen and rebased onto the latest master. > Harden SlotManager > -- > > Key: FLINK-5810 > URL: https://issues.apache.org/jira/browse/FLINK-5810 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > Harden the {{SlotManager}} logic to better cope with lost messages. -- This message was sent by Atlassian JIRA (v6.3.15#6346)