[GitHub] kisimple commented on a change in pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable
kisimple commented on a change in pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable URL: https://github.com/apache/flink/pull/7267#discussion_r240500235 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala ## @@ -29,6 +41,72 @@ class CRowSerializerTest extends TestLogger { @Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() + +val serializerConfigSnapshotClass = Class.forName( + "org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot") Review comment: Thanks for the review. Have updated the PR to address your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10359) Scala example in DataSet docs is broken
[ https://issues.apache.org/jira/browse/FLINK-10359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716378#comment-16716378 ] ASF GitHub Bot commented on FLINK-10359: tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken URL: https://github.com/apache/flink/pull/7266#issuecomment-446095615 Thanks for the clarification. Merging this .. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scala example in DataSet docs is broken > --- > > Key: FLINK-10359 > URL: https://issues.apache.org/jira/browse/FLINK-10359 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Fabian Hueske >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The Scala example of > [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions] > is broken. > The {{asScala}} and the {{reduce}} call fetch the Java {{Iterator}} which may > only fetched once. > {quote}The Iterable can be iterated over only once. Only the first call to > 'iterator()' will succeed.{quote} > While we are on it, it would make sense to check the other examples as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716377#comment-16716377 ] sunjincheng commented on FLINK-10543: - Revert in release-1.7: d3966332d4fe37fd22f92c758a6a34097ed16ac6 > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken
tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken URL: https://github.com/apache/flink/pull/7266#issuecomment-446095615 Thanks for the clarification. Merging this .. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11029) Incorrect parameter in Working with state doc
[ https://issues.apache.org/jira/browse/FLINK-11029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716376#comment-16716376 ] ASF GitHub Bot commented on FLINK-11029: tzulitai commented on issue #7198: [FLINK-11029][docs] fixed the Incorrect parameter in Working with state doc URL: https://github.com/apache/flink/pull/7198#issuecomment-446094754 Thanks, +1, LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Incorrect parameter in Working with state doc > - > > Key: FLINK-11029 > URL: https://issues.apache.org/jira/browse/FLINK-11029 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Yangze Guo >Assignee: Yangze Guo >Priority: Minor > Labels: pull-request-available > > In [Working with > State|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state], > the parameter of getAggregatingState() method is incorrect. > It should be AggregatingStateDescriptor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on issue #7198: [FLINK-11029][docs] fixed the Incorrect parameter in Working with state doc
tzulitai commented on issue #7198: [FLINK-11029][docs] fixed the Incorrect parameter in Working with state doc URL: https://github.com/apache/flink/pull/7198#issuecomment-446094754 Thanks, +1, LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10359) Scala example in DataSet docs is broken
[ https://issues.apache.org/jira/browse/FLINK-10359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716367#comment-16716367 ] ASF GitHub Bot commented on FLINK-10359: yanghua commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken URL: https://github.com/apache/flink/pull/7266#issuecomment-446094214 @tzulitai yes, I searched but it seems there is no issue like this in other places. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scala example in DataSet docs is broken > --- > > Key: FLINK-10359 > URL: https://issues.apache.org/jira/browse/FLINK-10359 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Fabian Hueske >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The Scala example of > [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions] > is broken. > The {{asScala}} and the {{reduce}} call fetch the Java {{Iterator}} which may > only fetched once. > {quote}The Iterable can be iterated over only once. Only the first call to > 'iterator()' will succeed.{quote} > While we are on it, it would make sense to check the other examples as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken
yanghua commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken URL: https://github.com/apache/flink/pull/7266#issuecomment-446094214 @tzulitai yes, I searched but it seems there is no issue like this in other places. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7219: [hotfix][docs] Correct the parameter type in Operators Overview doc
tzulitai commented on issue #7219: [hotfix][docs] Correct the parameter type in Operators Overview doc URL: https://github.com/apache/flink/pull/7219#issuecomment-446093969 Good catch, merging ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10359) Scala example in DataSet docs is broken
[ https://issues.apache.org/jira/browse/FLINK-10359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716361#comment-16716361 ] ASF GitHub Bot commented on FLINK-10359: tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken URL: https://github.com/apache/flink/pull/7266#issuecomment-446093688 Thanks @yanghua. Did you also check other Scala example snippets for the same issue? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scala example in DataSet docs is broken > --- > > Key: FLINK-10359 > URL: https://issues.apache.org/jira/browse/FLINK-10359 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Fabian Hueske >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The Scala example of > [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions] > is broken. > The {{asScala}} and the {{reduce}} call fetch the Java {{Iterator}} which may > only fetched once. > {quote}The Iterable can be iterated over only once. Only the first call to > 'iterator()' will succeed.{quote} > While we are on it, it would make sense to check the other examples as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken
tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken URL: https://github.com/apache/flink/pull/7266#issuecomment-446093688 Thanks @yanghua. Did you also check other Scala example snippets for the same issue? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable
[ https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716359#comment-16716359 ] ASF GitHub Bot commented on FLINK-11083: tzulitai commented on a change in pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable URL: https://github.com/apache/flink/pull/7267#discussion_r240489572 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala ## @@ -29,6 +41,72 @@ class CRowSerializerTest extends TestLogger { @Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() + +val serializerConfigSnapshotClass = Class.forName( + "org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot") Review comment: Can you use `classOf[...]` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > CRowSerializerConfigSnapshot is not instantiable > > > Key: FLINK-11083 > URL: https://issues.apache.org/jira/browse/FLINK-11083 > Project: Flink > Issue Type: Bug > Components: Table API SQL, Type Serialization System >Reporter: boshu Zheng >Assignee: boshu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > An exception was encountered when restarting a job with savepoint in our > production env, > {code:java} > 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task > :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> > Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING > to FAILED. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) > ... 5 more > Caused by: java.lang.RuntimeException: The class > 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' > is not instantiable: The class has no (implicit) public nullary constructor, > i.e. a constructor without arguments. > at > org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) > at > org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at >
[GitHub] tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs
tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs URL: https://github.com/apache/flink/pull/7254#issuecomment-446092899 Please remember, for future contributions, that the commit message should be appropriate. For example, the title you used for the PR would be better than the current "fix tEnv in tableApi docs". The commit messages should be tagged also. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs
tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs URL: https://github.com/apache/flink/pull/7254#issuecomment-446092512 LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable
tzulitai commented on a change in pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable URL: https://github.com/apache/flink/pull/7267#discussion_r240489572 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala ## @@ -29,6 +41,72 @@ class CRowSerializerTest extends TestLogger { @Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() + +val serializerConfigSnapshotClass = Class.forName( + "org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot") Review comment: Can you use `classOf[...]` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7270: [hotfix][docs] Fix typo in Joining documentation
tzulitai commented on issue #7270: [hotfix][docs] Fix typo in Joining documentation URL: https://github.com/apache/flink/pull/7270#issuecomment-446090988 LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10724) Refactor failure handling in check point coordinator
[ https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716352#comment-16716352 ] vinoyang commented on FLINK-10724: -- [~yunta] From the source code point of view {{abortSubsumed}}, the current checkpoint subsumed is treated as a "failure" way (especially when it can be subsumed), but should it be regarded as a failure when counting, I think it might need to be discussed. > Refactor failure handling in check point coordinator > > > Key: FLINK-10724 > URL: https://issues.apache.org/jira/browse/FLINK-10724 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > > At the moment failure handling of asynchronously triggered checkpoint in > check point coordinator happens in different places. We could organise it > similar way as failure handling of synchronous triggering of checkpoint in > *CheckpointTriggerResult* where we classify error cases. This will simplify > e.g. integration of error counter for FLINK-4810. > See also discussion here: [https://github.com/apache/flink/pull/6567] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10964) sql-client throws exception when paging through finished batch query
[ https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716340#comment-16716340 ] vinoyang edited comment on FLINK-10964 at 12/11/18 6:36 AM: [~twalthr] when you have time, can you help to review the PR of this issue? By analyzing the source code, I found that the ResultStore has been emptied at the time of cancelQuery. The main problem with this exception is that when stopping the retrieval thread, it still tries to call cancelQuery regardless of the running state of the job. was (Author: yanghua): [~twalthr] when you have time, can you help to review the PR of this issue? By analyzing the source code, I found that the ResultRestore has been emptied at the time of cancelQuery. The main problem with this exception is that when stopping the retrieval thread, it still tries to call cancelQuery regardless of the running state of the job. > sql-client throws exception when paging through finished batch query > - > > Key: FLINK-10964 > URL: https://issues.apache.org/jira/browse/FLINK-10964 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: Seth Wiesman >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > When paging through a batch query in state 'Finished' the sql client throws > the following exception: > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a > result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10964) sql-client throws exception when paging through finished batch query
[ https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716340#comment-16716340 ] vinoyang commented on FLINK-10964: -- [~twalthr] when you have time, can you help to review the PR of this issue? By analyzing the source code, I found that the ResultRestore has been emptied at the time of cancelQuery. The main problem with this exception is that when stopping the retrieval thread, it still tries to call cancelQuery regardless of the running state of the job. > sql-client throws exception when paging through finished batch query > - > > Key: FLINK-10964 > URL: https://issues.apache.org/jira/browse/FLINK-10964 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: Seth Wiesman >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > When paging through a batch query in state 'Finished' the sql client throws > the following exception: > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a > result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11118) Refactor and unify rowtime timestamp extractor interface
[ https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-8: - Issue Type: Task (was: Sub-task) Parent: (was: FLINK-7) > Refactor and unify rowtime timestamp extractor interface > > > Key: FLINK-8 > URL: https://issues.apache.org/jira/browse/FLINK-8 > Project: Flink > Issue Type: Task >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11117) Migrate connector classes to flink-table-spi
[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang closed FLINK-7. Resolution: Duplicate > Migrate connector classes to flink-table-spi > > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > This issue covers the fifth step of the migration plan mentioned in > [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free]. > Once we implemented improvements to the [unified connector > interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf], > we can also migrate the classes. Among others, it requires a refactoring of > the timestamp extractors which are the biggest blockers because they > transitively depending on expressions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11117) Migrate connector classes to flink-table-spi
[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716333#comment-16716333 ] vinoyang commented on FLINK-7: -- [~twalthr] oh, sorry, I will close it. > Migrate connector classes to flink-table-spi > > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > This issue covers the fifth step of the migration plan mentioned in > [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free]. > Once we implemented improvements to the [unified connector > interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf], > we can also migrate the classes. Among others, it requires a refactoring of > the timestamp extractors which are the biggest blockers because they > transitively depending on expressions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11117) Migrate connector classes to flink-table-spi
[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716330#comment-16716330 ] Timo Walther commented on FLINK-7: -- [~yanghua] This issue is a duplicate of FLINK-10688. > Migrate connector classes to flink-table-spi > > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > This issue covers the fifth step of the migration plan mentioned in > [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free]. > Once we implemented improvements to the [unified connector > interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf], > we can also migrate the classes. Among others, it requires a refactoring of > the timestamp extractors which are the biggest blockers because they > transitively depending on expressions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception
[ https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716328#comment-16716328 ] ASF GitHub Bot commented on FLINK-10774: stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#issuecomment-446086480 @tillrohrmann Thanks a lot for the feedbacks. I made the changes according to your comments. please take a look and see if I miss anything This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > connection leak when partition discovery is disabled and open throws exception > -- > > Key: FLINK-10774 > URL: https://issues.apache.org/jira/browse/FLINK-10774 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2, 1.5.5, 1.6.2 >Reporter: Steven Zhen Wu >Assignee: Steven Zhen Wu >Priority: Major > Labels: pull-request-available > > Here is the scenario to reproduce the issue > * partition discovery is disabled > * open method throws an exception (e.g. when broker SSL authorization denies > request) > In this scenario, run method won't be executed. As a result, > _partitionDiscoverer.close()_ won't be called. that caused the connection > leak, because KafkaConsumer is initialized but not closed. That has caused > outage that brought down our Kafka cluster, when a high-parallelism job got > into a restart/failure loop. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716331#comment-16716331 ] vinoyang commented on FLINK-11081: -- [~walterddr] Thank you for recognizing the value of this issue. In fact, its implementation emulates the traditional way Flink supports port ranges, such as "taskmanager.rpc.port". It allows to specify: * Single port (eg 8081, 0 means randomly picked by jm) * The port list (12345, 1234) will be selected in order in this case. * Port range (12345-12346), in this case will also be selected in order The specific implementation verifies the validity of the port and detects conflict exception. > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#issuecomment-446086480 @tillrohrmann Thanks a lot for the feedbacks. I made the changes according to your comments. please take a look and see if I miss anything This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716320#comment-16716320 ] ASF GitHub Bot commented on FLINK-11010: lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#discussion_r240484482 ## File path: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java ## @@ -164,4 +167,35 @@ public void testUnion() throws Exception { StreamITCase.compareWithList(expected); } + + @Test + public void testProctime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataStream> ds = JavaStreamTestData.getSmall3TupleDataSet(env); + tableEnv.registerDataStream("MyTable", ds, "a, b, c, proctime.proctime"); + + String sqlQuery = "select proctime from MyTable"; + + Table result = tableEnv.sqlQuery(sqlQuery); + + tableEnv + .toAppendStream(result, TypeInformation.of(Row.class)) + .addSink(new SinkFunction() { + @Override + public void invoke(Row value, Context context) throws Exception { + + Timestamp procTimestamp = (Timestamp) value.getField(0); + + // validate the second here + long procSecondTime = procTimestamp.getTime() / 1000; Review comment: yeah, it's better to validate hour. I'll update. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9699) Add api to replace registered table
[ https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716286#comment-16716286 ] ASF GitHub Bot commented on FLINK-9699: --- zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table URL: https://github.com/apache/flink/pull/6236#issuecomment-446082836 @hequn8128 @yanghua PR is updated, could you help review it ? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add api to replace registered table > --- > > Key: FLINK-9699 > URL: https://issues.apache.org/jira/browse/FLINK-9699 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#discussion_r240484482 ## File path: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java ## @@ -164,4 +167,35 @@ public void testUnion() throws Exception { StreamITCase.compareWithList(expected); } + + @Test + public void testProctime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataStream> ds = JavaStreamTestData.getSmall3TupleDataSet(env); + tableEnv.registerDataStream("MyTable", ds, "a, b, c, proctime.proctime"); + + String sqlQuery = "select proctime from MyTable"; + + Table result = tableEnv.sqlQuery(sqlQuery); + + tableEnv + .toAppendStream(result, TypeInformation.of(Row.class)) + .addSink(new SinkFunction() { + @Override + public void invoke(Row value, Context context) throws Exception { + + Timestamp procTimestamp = (Timestamp) value.getField(0); + + // validate the second here + long procSecondTime = procTimestamp.getTime() / 1000; Review comment: yeah, it's better to validate hour. I'll update. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11125) Remove useless import
[ https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716317#comment-16716317 ] ASF GitHub Bot commented on FLINK-11125: hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import URL: https://github.com/apache/flink/pull/7272 ## What is the purpose of the change This pull request removes some unused imports in tests and flink-table. ## Brief change log - Remove unused import ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove useless import > -- > > Key: FLINK-11125 > URL: https://issues.apache.org/jira/browse/FLINK-11125 > Project: Flink > Issue Type: Improvement > Components: Table API SQL, Tests >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11125) Remove useless import
[ https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11125: --- Labels: pull-request-available (was: ) > Remove useless import > -- > > Key: FLINK-11125 > URL: https://issues.apache.org/jira/browse/FLINK-11125 > Project: Flink > Issue Type: Improvement > Components: Table API SQL, Tests >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import
hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import URL: https://github.com/apache/flink/pull/7272 ## What is the purpose of the change This pull request removes some unused imports in tests and flink-table. ## Brief change log - Remove unused import ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table
zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table URL: https://github.com/apache/flink/pull/6236#issuecomment-446082836 @hequn8128 @yanghua PR is updated, could you help review it ? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11060) Unable to set number of task manager and slot per task manager in scala shell local mode
[ https://issues.apache.org/jira/browse/FLINK-11060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716285#comment-16716285 ] ASF GitHub Bot commented on FLINK-11060: zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set number of task manager and slot per task manager in scala shell local mode URL: https://github.com/apache/flink/pull/7229#issuecomment-446082679 @yanghua @tillrohrmann Could you help review it ? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unable to set number of task manager and slot per task manager in scala shell > local mode > > > Key: FLINK-11060 > URL: https://issues.apache.org/jira/browse/FLINK-11060 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.7.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > In scala-shell, I can not change the number of task manager and slot per task > manager, it is hard coded to 1. I can not specify them in flink-conf.yaml -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set number of task manager and slot per task manager in scala shell local mode
zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set number of task manager and slot per task manager in scala shell local mode URL: https://github.com/apache/flink/pull/7229#issuecomment-446082679 @yanghua @tillrohrmann Could you help review it ? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11125) Remove useless import
[ https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716274#comment-16716274 ] TisonKun commented on FLINK-11125: -- @[~hequn8128] If the change is not big, it might be satisfied in a hotfix. Otherwise it's better to detail the title as "Remove useless import under XXXClass" or something. > Remove useless import > -- > > Key: FLINK-11125 > URL: https://issues.apache.org/jira/browse/FLINK-11125 > Project: Flink > Issue Type: Improvement > Components: Table API SQL, Tests >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11039) LogicalExchange and HashPartitioner realization
[ https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition closed FLINK-11039. Resolution: Fixed > LogicalExchange and HashPartitioner realization > --- > > Key: FLINK-11039 > URL: https://issues.apache.org/jira/browse/FLINK-11039 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function > realization. > FlinkLogicalExchange realization > org.apache.calcite.rel.logical.LogicalExchange. > HashPartitioner is Partitioner that implements hash-based partitioning using > Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type > HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ambition119 closed pull request #7202: [FLINK-11039] LogicalExchange and HashPartitioner realization
ambition119 closed pull request #7202: [FLINK-11039] LogicalExchange and HashPartitioner realization URL: https://github.com/apache/flink/pull/7202 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization
[ https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716238#comment-16716238 ] ASF GitHub Bot commented on FLINK-11039: ambition119 closed pull request #7202: [FLINK-11039] LogicalExchange and HashPartitioner realization URL: https://github.com/apache/flink/pull/7202 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > LogicalExchange and HashPartitioner realization > --- > > Key: FLINK-11039 > URL: https://issues.apache.org/jira/browse/FLINK-11039 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function > realization. > FlinkLogicalExchange realization > org.apache.calcite.rel.logical.LogicalExchange. > HashPartitioner is Partitioner that implements hash-based partitioning using > Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type > HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716215#comment-16716215 ] ASF GitHub Bot commented on FLINK-9083: --- jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r240476864 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: @azagrebin @zentol Okay. Yeah. Then we don't need a barrier. The semaphore would be sufficient. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For
[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r240476864 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: @azagrebin @zentol Okay. Yeah. Then we don't need a barrier. The semaphore would be sufficient. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11126) Filter out AMRMToken in the TaskManager credentials
Paul Lin created FLINK-11126: Summary: Filter out AMRMToken in the TaskManager credentials Key: FLINK-11126 URL: https://issues.apache.org/jira/browse/FLINK-11126 Project: Flink Issue Type: Improvement Components: Security, YARN Affects Versions: 1.7.0, 1.6.2 Reporter: Paul Lin Assignee: Paul Lin Currently, Flink JobManager propagates its storage tokens to TaskManager to meet the requirement of YARN log aggregation (see FLINK-6376). But in this way the AMRMToken is also included in the TaskManager credentials, which could be potentially insecure. We should filter out AMRMToken before setting the tokens to TaskManager's container launch context. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716173#comment-16716173 ] Rong Rong commented on FLINK-11081: --- +1 to this feature. Very helpful actually for lots of our use cases. just quick higher level questions, 1) what is the binding strategy of the port range to the actual port number? (smallest? random?); and 2) should we introduce a range limit to maximum range size? > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716171#comment-16716171 ] ASF GitHub Bot commented on FLINK-4810: --- ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints URL: https://github.com/apache/flink/pull/3334#issuecomment-446070175 @azagrebin - Thanks for the ping. Currently am not working on this. Pls feel free to work on this or the related JIRA FLINK-10074. I would add myself as a watcher to understand more about it. Thanks once again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful > checkpoints > > > Key: FLINK-4810 > URL: https://issues.apache.org/jira/browse/FLINK-4810 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Priority: Major > Labels: pull-request-available > > The Checkpoint coordinator should track the number of consecutive > unsuccessful checkpoints. > If more than {{n}} (configured value) checkpoints fail in a row, it should > call {{fail()}} on the execution graph to trigger a recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints URL: https://github.com/apache/flink/pull/3334#issuecomment-446070175 @azagrebin - Thanks for the ping. Currently am not working on this. Pls feel free to work on this or the related JIRA FLINK-10074. I would add myself as a watcher to understand more about it. Thanks once again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts
[ https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Lin closed FLINK-10943. Resolution: Not A Bug > Flink runtime test failed caused by curator dependency conflicts > > > Key: FLINK-10943 > URL: https://issues.apache.org/jira/browse/FLINK-10943 > Project: Flink > Issue Type: Bug > Components: Build System, Tests >Affects Versions: 1.5.5, 1.6.2 >Reporter: Paul Lin >Assignee: Paul Lin >Priority: Minor > Labels: pull-request-available > Attachments: > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt > > > Hadoop-common of 2.6 + version includes curator dependencies, which would > have conflicts with the curator used by Flink runtime and cause test failures > (the attachment is the surefire report). > The curator dependencies tree of flink runtime is as below: > ``` > flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes > flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client > ``` > According to the dependency mechanism, maven would pick the curator-client in > flink-shaded-hadoop2, and curator-framework and curator-recipes from > flink-shaded-curator. > To fix the problem I think we can simply exclude curator-client from > flink-shaded-hadoop2 dependency in flink-runtime. > I'd like to fix this, please let me know what you think. Thanks! > [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716164#comment-16716164 ] ASF GitHub Bot commented on FLINK-11010: walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#discussion_r240470904 ## File path: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java ## @@ -164,4 +167,35 @@ public void testUnion() throws Exception { StreamITCase.compareWithList(expected); } + + @Test + public void testProctime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataStream> ds = JavaStreamTestData.getSmall3TupleDataSet(env); + tableEnv.registerDataStream("MyTable", ds, "a, b, c, proctime.proctime"); + + String sqlQuery = "select proctime from MyTable"; + + Table result = tableEnv.sqlQuery(sqlQuery); + + tableEnv + .toAppendStream(result, TypeInformation.of(Row.class)) + .addSink(new SinkFunction() { + @Override + public void invoke(Row value, Context context) throws Exception { + + Timestamp procTimestamp = (Timestamp) value.getField(0); + + // validate the second here + long procSecondTime = procTimestamp.getTime() / 1000; Review comment: This causes me some trouble because cases can happen when they cross the second boundary. this is not a stable ITCase in my opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#discussion_r240470904 ## File path: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java ## @@ -164,4 +167,35 @@ public void testUnion() throws Exception { StreamITCase.compareWithList(expected); } + + @Test + public void testProctime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataStream> ds = JavaStreamTestData.getSmall3TupleDataSet(env); + tableEnv.registerDataStream("MyTable", ds, "a, b, c, proctime.proctime"); + + String sqlQuery = "select proctime from MyTable"; + + Table result = tableEnv.sqlQuery(sqlQuery); + + tableEnv + .toAppendStream(result, TypeInformation.of(Row.class)) + .addSink(new SinkFunction() { + @Override + public void invoke(Row value, Context context) throws Exception { + + Timestamp procTimestamp = (Timestamp) value.getField(0); + + // validate the second here + long procSecondTime = procTimestamp.getTime() / 1000; Review comment: This causes me some trouble because cases can happen when they cross the second boundary. this is not a stable ITCase in my opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716158#comment-16716158 ] ASF GitHub Bot commented on FLINK-11048: tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r240470721 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, + List jarFiles, Review comment: It seems to leave room for arbitrary review demands also! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r240470721 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, + List jarFiles, Review comment: It seems to leave room for arbitrary review demands also! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11123) Improve ml quick start doc
[ https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11123. --- Resolution: Fixed Fix Version/s: 1.6.3 Fixed in master: 156c09ced9a61d20e2e5d4ce5cfedab8ac4d4ee4 Fixed in release-1.7: f045dfd501343bc9c72c665a6c599e42a221bd67 Fixed in release-1.6: d1a489f9eda733d914a885cd593b26f336a4d380 > Improve ml quick start doc > -- > > Key: FLINK-11123 > URL: https://issues.apache.org/jira/browse/FLINK-11123 > Project: Flink > Issue Type: Improvement > Components: Documentation, Machine Learning Library >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.3, 1.7.1, 1.7.0 > > > The user cannot run the sample through the ml quick launch document because > the import description of the class is missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716154#comment-16716154 ] ASF GitHub Bot commented on FLINK-11048: tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r240470550 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Agreed on the public method contract. I rearranged the code to keep the existing method as is. The setter is an alternative option for users that already have the remote environment instance (instead of having to deal with an 8 parameter static method). I would like to keep this as is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r240470550 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Agreed on the public method contract. I rearranged the code to keep the existing method as is. The setter is an alternative option for users that already have the remote environment instance (instead of having to deal with an 8 parameter static method). I would like to keep this as is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()
[ https://issues.apache.org/jira/browse/FLINK-11124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11124: --- Labels: pull-request-available (was: ) > Add private[flink] to TemporalTableFunction.create() > > > Key: FLINK-11124 > URL: https://issues.apache.org/jira/browse/FLINK-11124 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > {{TemporalTableFunction}} is an user-oriented class. I think it would be > better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} > method in order to make it invisible to users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add private[flink] to TemporalTableFunction.create() method
hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add private[flink] to TemporalTableFunction.create() method URL: https://github.com/apache/flink/pull/7271 ## What is the purpose of the change This pull request adds `private[flink]` to `TemporalTableFunction.create()` method. As `TemporalTableFunction` is an user-oriented class. I think it would be better to add `private[flink]` to the `TemporalTableFunction.create()` method in order to make it invisible to users. ## Brief change log - Add private[flink] to TemporalTableFunction.create() ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()
[ https://issues.apache.org/jira/browse/FLINK-11124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716152#comment-16716152 ] ASF GitHub Bot commented on FLINK-11124: hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add private[flink] to TemporalTableFunction.create() method URL: https://github.com/apache/flink/pull/7271 ## What is the purpose of the change This pull request adds `private[flink]` to `TemporalTableFunction.create()` method. As `TemporalTableFunction` is an user-oriented class. I think it would be better to add `private[flink]` to the `TemporalTableFunction.create()` method in order to make it invisible to users. ## Brief change log - Add private[flink] to TemporalTableFunction.create() ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add private[flink] to TemporalTableFunction.create() > > > Key: FLINK-11124 > URL: https://issues.apache.org/jira/browse/FLINK-11124 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > {{TemporalTableFunction}} is an user-oriented class. I think it would be > better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} > method in order to make it invisible to users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts
link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts URL: https://github.com/apache/flink/pull/7148 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index d216851c2d1..0cab4feabda 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -72,6 +72,12 @@ under the License. flink-shaded-hadoop2 ${project.version} true + + + org.apache.curator + curator-client + + This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts
[ https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716150#comment-16716150 ] ASF GitHub Bot commented on FLINK-10943: link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts URL: https://github.com/apache/flink/pull/7148 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index d216851c2d1..0cab4feabda 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -72,6 +72,12 @@ under the License. flink-shaded-hadoop2 ${project.version} true + + + org.apache.curator + curator-client + + This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink runtime test failed caused by curator dependency conflicts > > > Key: FLINK-10943 > URL: https://issues.apache.org/jira/browse/FLINK-10943 > Project: Flink > Issue Type: Bug > Components: Build System, Tests >Affects Versions: 1.5.5, 1.6.2 >Reporter: Paul Lin >Assignee: Paul Lin >Priority: Minor > Labels: pull-request-available > Attachments: > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt > > > Hadoop-common of 2.6 + version includes curator dependencies, which would > have conflicts with the curator used by Flink runtime and cause test failures > (the attachment is the surefire report). > The curator dependencies tree of flink runtime is as below: > ``` > flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes > flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client > ``` > According to the dependency mechanism, maven would pick the curator-client in > flink-shaded-hadoop2, and curator-framework and curator-recipes from > flink-shaded-curator. > To fix the problem I think we can simply exclude curator-client from > flink-shaded-hadoop2 dependency in flink-runtime. > I'd like to fix this, please let me know what you think. Thanks! > [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts
[ https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716149#comment-16716149 ] ASF GitHub Bot commented on FLINK-10943: link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts URL: https://github.com/apache/flink/pull/7148#issuecomment-446068042 @tillrohrmann Thanks for your review. You're right, it's caused by the relocation problem of IntelliJ. I'll close the PR and the Jira ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink runtime test failed caused by curator dependency conflicts > > > Key: FLINK-10943 > URL: https://issues.apache.org/jira/browse/FLINK-10943 > Project: Flink > Issue Type: Bug > Components: Build System, Tests >Affects Versions: 1.5.5, 1.6.2 >Reporter: Paul Lin >Assignee: Paul Lin >Priority: Minor > Labels: pull-request-available > Attachments: > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt > > > Hadoop-common of 2.6 + version includes curator dependencies, which would > have conflicts with the curator used by Flink runtime and cause test failures > (the attachment is the surefire report). > The curator dependencies tree of flink runtime is as below: > ``` > flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes > flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client > ``` > According to the dependency mechanism, maven would pick the curator-client in > flink-shaded-hadoop2, and curator-framework and curator-recipes from > flink-shaded-curator. > To fix the problem I think we can simply exclude curator-client from > flink-shaded-hadoop2 dependency in flink-runtime. > I'd like to fix this, please let me know what you think. Thanks! > [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts
link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts URL: https://github.com/apache/flink/pull/7148#issuecomment-446068042 @tillrohrmann Thanks for your review. You're right, it's caused by the relocation problem of IntelliJ. I'll close the PR and the Jira ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11090) Unused parameter in WindowedStream.aggregate()
[ https://issues.apache.org/jira/browse/FLINK-11090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716146#comment-16716146 ] Hequn Cheng commented on FLINK-11090: - Hi [~aljoscha] thanks for telling the differences between \{{PublicEvolving}} and \{{Public}}. It would be nice to remove the useless parameter directly. I will give a pr soon. Thanks, Hequn > Unused parameter in WindowedStream.aggregate() > -- > > Key: FLINK-11090 > URL: https://issues.apache.org/jira/browse/FLINK-11090 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > The {{aggregateResultType}} parameter in {{WindowedStream.aggregate()}} seems > useless. Or what have I missed? > If it is useless, I prefer to remove the parameter by adding a new API and > deprecate the current one. We can't remove it directly as it is > PublicEvolving. > {code:java} > @PublicEvolving > public SingleOutputStreamOperator aggregate( > AggregateFunction aggregateFunction, > ProcessWindowFunction windowFunction, > TypeInformation accumulatorType, > TypeInformation aggregateResultType, > TypeInformation resultType) { > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11125) Remove useless import
Hequn Cheng created FLINK-11125: --- Summary: Remove useless import Key: FLINK-11125 URL: https://issues.apache.org/jira/browse/FLINK-11125 Project: Flink Issue Type: Improvement Components: Table API SQL, Tests Reporter: Hequn Cheng Assignee: Hequn Cheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11123) Improve ml quick start doc
[ https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716125#comment-16716125 ] ASF GitHub Bot commented on FLINK-11123: asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic… URL: https://github.com/apache/flink/pull/7269 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md index e056b28b505..2e9a7b9505c 100644 --- a/docs/dev/libs/ml/quickstart.md +++ b/docs/dev/libs/ml/quickstart.md @@ -153,6 +153,8 @@ A conversion can be done using a simple normalizer mapping function: {% highlight scala %} +import org.apache.flink.ml.math.Vector + def normalizer : LabeledVector => LabeledVector = { lv => LabeledVector(if (lv.label > 0.0) 1.0 else -1.0, lv.vector) } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve ml quick start doc > -- > > Key: FLINK-11123 > URL: https://issues.apache.org/jira/browse/FLINK-11123 > Project: Flink > Issue Type: Improvement > Components: Documentation, Machine Learning Library >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0, 1.7.1 > > > The user cannot run the sample through the ml quick launch document because > the import description of the class is missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic…
asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic… URL: https://github.com/apache/flink/pull/7269 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md index e056b28b505..2e9a7b9505c 100644 --- a/docs/dev/libs/ml/quickstart.md +++ b/docs/dev/libs/ml/quickstart.md @@ -153,6 +153,8 @@ A conversion can be done using a simple normalizer mapping function: {% highlight scala %} +import org.apache.flink.ml.math.Vector + def normalizer : LabeledVector => LabeledVector = { lv => LabeledVector(if (lv.label > 0.0) 1.0 else -1.0, lv.vector) } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716116#comment-16716116 ] Hequn Cheng commented on FLINK-10543: - [~fhueske] Hi, would be nice if you can take a look at the PR. The pr mainly include the following changes: * Delete expired timers in {{ProcessFunctionWithCleanupState}}. * Add {{CoProcessFunctionWithCleanupState}} for join. This can align the cleanup logic in join with other operators. * Use one {{ValueState}} to control clean up instead of two, i.e, take left and right state of join as a whole. * Leverage min and max retention time to clean up the state in join while before, join regist clean up timer and clean up the state in a fixed interval. In the new version, we can remove all records once in all as new records will refresh the timer. Best, Hequn > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716101#comment-16716101 ] sunjincheng commented on FLINK-10543: - Hi [~fhueske] from the point of compatibility I agree this change should not cp to release-1.7. I'll revert the change from release-1.7. Thank you for reminding we only back port bug fixes to release branches that do not break compatibility. I think the improvement is necessary on the master to reduce the storage and access pressure on useless states. If you find there are any incorrect semantics changes, we can file new JIRA. to fix. Thanks, Jincheng > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zzchun opened a new pull request #7270: [hotfix][docs] Fix typo in Joining documentation
zzchun opened a new pull request #7270: [hotfix][docs] Fix typo in Joining documentation URL: https://github.com/apache/flink/pull/7270 ## What is the purpose of the change Fix typo in Joining documentation ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()
Hequn Cheng created FLINK-11124: --- Summary: Add private[flink] to TemporalTableFunction.create() Key: FLINK-11124 URL: https://issues.apache.org/jira/browse/FLINK-11124 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng {{TemporalTableFunction}} is an user-oriented class. I think it would be better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} method in order to make it invisible to users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716027#comment-16716027 ] Fabian Hueske commented on FLINK-10543: --- Hi [~sunjincheng121], [~hequn8128], I had a quick look at the PR and it seems that the clean up behavior of some operators was changed. 1. Some {{ValueState}} were removed from join operators, which breaks savepoint compatibility. 2. The clean up behavior of joins was changed. It seems that all records are removed in case of a firing clean up timer while before, non-expired records were kept. We only backport bug fixes to release branches that do not break compatibility (unless it is absolutely necessary). Hence, the commit on the release-1.7 branch must be reverted. I think we should also discuss the changes on the master branch. This issue was about leveraging the new timer deletion feature to improve the performance and not changing the clean up semantics of the operators. I'll have a closer look at the changes in the next days to figure out what exactly changed. Thanks, Fabian > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-11010: --- Affects Version/s: 1.8.0 > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-11010: --- Affects Version/s: 1.7.1 1.7.0 > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716020#comment-16716020 ] ASF GitHub Bot commented on FLINK-11010: lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446050898 hi, @zentol cc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446050898 hi, @zentol cc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join
[ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716015#comment-16716015 ] Hequn Cheng commented on FLINK-11070: - [~fhueske] Thanks for your quick reply! Ok, I got your point. It also makes sense to notify users about the potential performance risk. Once we support join reordering, we can remove the switches. Best, Hequn > Add stream-stream non-window cross join > --- > > Key: FLINK-11070 > URL: https://issues.apache.org/jira/browse/FLINK-11070 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Currently, we don't reorder join and rely on the order provided by the user. > This is fine for most of the cases, however, it limits the set of supported > SQL queries. > Example: > {code:java} > val streamUtil: StreamTableTestUtil = streamTestUtil() > streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > val sqlQuery = > """ > |SELECT t1.a, t3.b > |FROM MyTable3 t3, MyTable2 t2, MyTable t1 > |WHERE t1.a = t3.a AND t1.a = t2.a > |""".stripMargin > streamUtil.printSql(sqlQuery) > {code} > Given the current rule sets, this query produces a cross join which is not > supported and thus leads to: > {code:java} > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > LogicalProject(a=[$8], b=[$1]) > LogicalFilter(condition=[AND(=($8, $0), =($8, $4))]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalTableScan(table=[[_DataStreamTable_2]]) > LogicalTableScan(table=[[_DataStreamTable_1]]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > {code} > In order to support more queries, it would be nice to have cross join on > streaming. We can start from a simple version, for example, call > forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross > join. The performance may be bad. But it works fine if the two tables of > cross join are small ones. > We can do some optimizations later, such as broadcasting the smaller side, > etc. > Any suggestions are greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join
[ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715954#comment-16715954 ] Fabian Hueske commented on FLINK-11070: --- Hmmm, I'm hesitant to enable cross joins by default. It is very easy to write a query that performs unnecessary cross joins. I would not trust users with that. If we run cross joins by default, users will experience very bad performance even the query could be rewritten without cross joins. With a switch in the table config we can notify the user about a (potentially) inefficient query and users can manually enable these queries. I don't really see a problem removing the switch in the future. We can change the default behavior to enable cross joins by default once we have support for join reordering. Even if the cardinality estimates are not perfect, equi joins should be much cheaper than cross joins. > Add stream-stream non-window cross join > --- > > Key: FLINK-11070 > URL: https://issues.apache.org/jira/browse/FLINK-11070 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Currently, we don't reorder join and rely on the order provided by the user. > This is fine for most of the cases, however, it limits the set of supported > SQL queries. > Example: > {code:java} > val streamUtil: StreamTableTestUtil = streamTestUtil() > streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > val sqlQuery = > """ > |SELECT t1.a, t3.b > |FROM MyTable3 t3, MyTable2 t2, MyTable t1 > |WHERE t1.a = t3.a AND t1.a = t2.a > |""".stripMargin > streamUtil.printSql(sqlQuery) > {code} > Given the current rule sets, this query produces a cross join which is not > supported and thus leads to: > {code:java} > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > LogicalProject(a=[$8], b=[$1]) > LogicalFilter(condition=[AND(=($8, $0), =($8, $4))]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalTableScan(table=[[_DataStreamTable_2]]) > LogicalTableScan(table=[[_DataStreamTable_1]]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > {code} > In order to support more queries, it would be nice to have cross join on > streaming. We can start from a simple version, for example, call > forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross > join. The performance may be bad. But it works fine if the two tables of > cross join are small ones. > We can do some optimizations later, such as broadcasting the smaller side, > etc. > Any suggestions are greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446041717 i got the same problem too: val format = new SimpleDateFormat("-MM-dd HH:mm:ssZ") val origin: DataStream[TransactionEvent] = env.fromCollection(List( TransactionEvent("u1", format.parse("2018-01-02 01:13:30+0800"), 10) )) val source2 = origin .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TransactionEvent](Time.minutes(1)){ override def extractTimestamp(element: TransactionEvent): Long = { val timestamp = element.time.getTime println(s"extractTimestamp:$timestamp") timestamp } }) tEnv.fromDataStream(source2, 'user,'eventTime.rowtime) .toAppendStream[Row].print() I got eventTime as 2018-01-01 17:13:30.0, which is 8hours delayed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715949#comment-16715949 ] ASF GitHub Bot commented on FLINK-11010: samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446041717 i got the same problem too: val format = new SimpleDateFormat("-MM-dd HH:mm:ssZ") val origin: DataStream[TransactionEvent] = env.fromCollection(List( TransactionEvent("u1", format.parse("2018-01-02 01:13:30+0800"), 10) )) val source2 = origin .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TransactionEvent](Time.minutes(1)){ override def extractTimestamp(element: TransactionEvent): Long = { val timestamp = element.time.getTime println(s"extractTimestamp:$timestamp") timestamp } }) tEnv.fromDataStream(source2, 'user,'eventTime.rowtime) .toAppendStream[Row].print() I got eventTime as 2018-01-01 17:13:30.0, which is 8hours delayed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715937#comment-16715937 ] Fabian Hueske commented on FLINK-10566: --- Hmmm, true. The stack trace was taken during a recursive sink-to-source plan traversal to register the serializers. The plan resulting from the given program branches a lot. I guess the problem is that the traversal does not check if a subplan was already traversed. > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.5.4, 1.6.1, 1.7.0 >Reporter: Robert Bradshaw >Priority: Major > Attachments: chart.png > > > This makes it nearly impossible to run graphs with 100 or more stages. (The > execution itself is still sub-second, but the job submission takes > increasingly long.) > I can reproduce this with the following pipeline, which resembles my > real-world workloads (with depth up to 10 and width up, and past, 50). On > Flink it seems getting width beyond width 10 is problematic (times out after > hours). Note the log scale on the chart for time. > > {code:java} > public static void runPipeline(int depth, int width) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet input = env.fromElements("a", "b", "c"); > DataSet stats = null; > for (int i = 0; i < depth; i++) { > stats = analyze(input, stats, width / (i + 1) + 1); > } > stats.writeAsText("out.txt"); > env.execute("depth " + depth + " width " + width); > } > public static DataSet analyze(DataSet input, > DataSet stats, int branches) { > System.out.println("analyze " + branches); > for (int i = 0; i < branches; i++) { > final int ii = i; > if (stats != null) { > input = input.map(new RichMapFunction() { > @Override > public void open(Configuration parameters) throws Exception { > Collection broadcastSet = > getRuntimeContext().getBroadcastVariable("stats"); > } > @Override > public String map(String value) throws Exception { > return value; > } > }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); > } > DataSet branch = input >.map(s -> new Tuple2(0, s + > ii)) >.groupBy(0) >.minBy(1) >.map(kv -> kv.f1); > if (stats == null) { > stats = branch; > } else { > stats = stats.union(branch); > } > } > return stats.map(s -> "(" + s + ").stats"); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend
[ https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715932#comment-16715932 ] ASF GitHub Bot commented on FLINK-11074: dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240445274 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Integer => JInt} +import java.util.concurrent.ConcurrentLinkedQueue + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.scala._ +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.dataview.{DataView, MapView} +import org.apache.flink.table.dataview.StateMapView +import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, GroupAggProcessFunction} +import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.junit.Assert.assertTrue +import org.junit.Test + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class AggFunctionHarnessTest extends HarnessTestBase { + private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), Time.seconds(0)) + + @Test + def testCollectAggregate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val data = new mutable.MutableList[(JInt, String)] +val t = env.fromCollection(data).toTable(tEnv, 'a, 'b) +tEnv.registerTable("T", t) +val sqlQuery = tEnv.sqlQuery( + s""" + |SELECT + | b, collect(a) + |FROM ( + | SELECT a, b + | FROM T + | GROUP BY a, b + |) GROUP BY b + |""".stripMargin) + +val testHarness = createHarnessTester[String, CRow, CRow]( + sqlQuery.toRetractStream[Row](queryConfig), "groupBy") + +testHarness.setStateBackend(getStateBackend) +testHarness.open() + +val operator = getOperator(testHarness) +val state = getState(operator, "acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]] +assertTrue(state.isInstanceOf[StateMapView[_, _]]) + assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]]) + +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1)) +expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 1).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 2).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 1).asJava), 1)) + +// remove some state: state may be cleaned up by the state backend if not accessed more than ttl +operator.setCurrentKey(Row.of("aaa")) +state.remove(2) + +// retract after state has been cleaned up +testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 1)) + +val result =
[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend
[ https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715934#comment-16715934 ] ASF GitHub Bot commented on FLINK-11074: dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240445604 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -491,13 +489,75 @@ class HarnessTestBase { distinctCountFuncName, distinctCountAggCode) + def createHarnessTester[KEY, IN, OUT]( + dataStream: DataStream[_], + prefixOperatorName: String) + : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = { + +val transformation = extractExpectedTransformation( + dataStream.javaStream.getTransformation, + prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]] +if (transformation == null) { + throw new Exception("Can not find the expected transformation") +} + +val processOperator = transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]] +val keySelector = transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]] +val keyType = transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]] + +createHarnessTester(processOperator, keySelector, keyType) + .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]] + } + + private def extractExpectedTransformation( + transformation: StreamTransformation[_], + prefixOperatorName: String): StreamTransformation[_] = { +def extractFromInputs(inputs: StreamTransformation[_]*): StreamTransformation[_] = { + for (input <- inputs) { +val t = extractExpectedTransformation(input, prefixOperatorName) +if (t != null) { + return t +} + } + null +} + +transformation match { + case one: OneInputTransformation[_, _] => +if (one.getName.startsWith(prefixOperatorName)) { + one +} else { + extractExpectedTransformation(one.getInput, prefixOperatorName) +} + case two: TwoInputTransformation[_, _, _] => Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve the harness test to make it possible test with state backend > > > Key: FLINK-11074 > URL: https://issues.apache.org/jira/browse/FLINK-11074 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Currently, the harness test can only test without state backend. If you use a > DataView in the accumulator of the aggregate function, the DataView is a java > object and held in heap, not replaced with StateMapView/StateListView which > values are actually held in the state backend. We should improve the harness > test to make it possible to test with state backend. Otherwise, issues such > as FLINK-10674 could have never been found. With this harness test available, > we could test the built-in aggregate functions which use the DataView more > fine grained. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend
[ https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715933#comment-16715933 ] ASF GitHub Bot commented on FLINK-11074: dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240445308 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -87,7 +85,7 @@ class HarnessTestBase { new RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)): _*) protected val distinctCountDescriptor: String = EncodingUtils.encodeObjectToString( -new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, Types.LONG)) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve the harness test to make it possible test with state backend > > > Key: FLINK-11074 > URL: https://issues.apache.org/jira/browse/FLINK-11074 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Currently, the harness test can only test without state backend. If you use a > DataView in the accumulator of the aggregate function, the DataView is a java > object and held in heap, not replaced with StateMapView/StateListView which > values are actually held in the state backend. We should improve the harness > test to make it possible to test with state backend. Otherwise, issues such > as FLINK-10674 could have never been found. With this harness test available, > we could test the built-in aggregate functions which use the DataView more > fine grained. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction
dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240445308 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -87,7 +85,7 @@ class HarnessTestBase { new RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)): _*) protected val distinctCountDescriptor: String = EncodingUtils.encodeObjectToString( -new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, Types.LONG)) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction
dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240445274 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Integer => JInt} +import java.util.concurrent.ConcurrentLinkedQueue + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.scala._ +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.dataview.{DataView, MapView} +import org.apache.flink.table.dataview.StateMapView +import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, GroupAggProcessFunction} +import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.junit.Assert.assertTrue +import org.junit.Test + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class AggFunctionHarnessTest extends HarnessTestBase { + private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), Time.seconds(0)) + + @Test + def testCollectAggregate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val data = new mutable.MutableList[(JInt, String)] +val t = env.fromCollection(data).toTable(tEnv, 'a, 'b) +tEnv.registerTable("T", t) +val sqlQuery = tEnv.sqlQuery( + s""" + |SELECT + | b, collect(a) + |FROM ( + | SELECT a, b + | FROM T + | GROUP BY a, b + |) GROUP BY b + |""".stripMargin) + +val testHarness = createHarnessTester[String, CRow, CRow]( + sqlQuery.toRetractStream[Row](queryConfig), "groupBy") + +testHarness.setStateBackend(getStateBackend) +testHarness.open() + +val operator = getOperator(testHarness) +val state = getState(operator, "acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]] +assertTrue(state.isInstanceOf[StateMapView[_, _]]) + assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]]) + +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1)) +expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 1).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 2).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 1).asJava), 1)) + +// remove some state: state may be cleaned up by the state backend if not accessed more than ttl +operator.setCurrentKey(Row.of("aaa")) +state.remove(2) + +// retract after state has been cleaned up +testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 1)) + +val result = testHarness.getOutput + +verify(expectedOutput, result) + +testHarness.close() + } + + private def getState( Review comment: Make sense. Done. This is an automated
[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction
dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240445604 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -491,13 +489,75 @@ class HarnessTestBase { distinctCountFuncName, distinctCountAggCode) + def createHarnessTester[KEY, IN, OUT]( + dataStream: DataStream[_], + prefixOperatorName: String) + : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = { + +val transformation = extractExpectedTransformation( + dataStream.javaStream.getTransformation, + prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]] +if (transformation == null) { + throw new Exception("Can not find the expected transformation") +} + +val processOperator = transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]] +val keySelector = transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]] +val keyType = transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]] + +createHarnessTester(processOperator, keySelector, keyType) + .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]] + } + + private def extractExpectedTransformation( + transformation: StreamTransformation[_], + prefixOperatorName: String): StreamTransformation[_] = { +def extractFromInputs(inputs: StreamTransformation[_]*): StreamTransformation[_] = { + for (input <- inputs) { +val t = extractExpectedTransformation(input, prefixOperatorName) +if (t != null) { + return t +} + } + null +} + +transformation match { + case one: OneInputTransformation[_, _] => +if (one.getName.startsWith(prefixOperatorName)) { + one +} else { + extractExpectedTransformation(one.getInput, prefixOperatorName) +} + case two: TwoInputTransformation[_, _, _] => Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11089) Log filecache directory removed messages
[ https://issues.apache.org/jira/browse/FLINK-11089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715910#comment-16715910 ] ASF GitHub Bot commented on FLINK-11089: liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory removed messages URL: https://github.com/apache/flink/pull/7257#issuecomment-446037623 @tillrohrmann Hi,are you available to help me to review this PR? Thanks! Look forwards to your reply. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Log filecache directory removed messages > - > > Key: FLINK-11089 > URL: https://issues.apache.org/jira/browse/FLINK-11089 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.7.0 >Reporter: liuzhaokun >Priority: Minor > Labels: pull-request-available > > When taskmanager exit or shutdown,the filecache directory named > "flink-dist-cache*" will be removed,but there is not any log about this > action.So I think we should log it for user to check it easy when there are > some bugs. > You can see IOManager.java logs the removed messages when taskmanager > shutdown, filecache can do the same things. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory removed messages
liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory removed messages URL: https://github.com/apache/flink/pull/7257#issuecomment-446037623 @tillrohrmann Hi,are you available to help me to review this PR? Thanks! Look forwards to your reply. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join
[ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715884#comment-16715884 ] Hequn Cheng commented on FLINK-11070: - [~fhueske] Hi, thanks for the feedback and sharing the thoughts. As for the example, I think I should add more details. * Table2 and table3 are very small tables, such as a table with only one row. And table1 is a very big table, such as with 1M rows. * All these rows contain same join keys. So the cost for cross join and non-cross join would be 1*1*1M(cross join) VS 1*1M*1M(non-cross join). This means the query may be executed with a cross join with a much better performance. Adding a switch is a good idea. This force our users to pay more attention to the performance of cross join. But it may also bring some inconvenience. We can't remove the switch even Flink support join reordering. Because there is a chance the cardinality estimates have not been passed by the user. So once we can't get the cardinality estimates, the user has to configure the switch to enable a cross join if he does want to use cross join. From this point of view, I think we should not have the switch. I would propose that: * Don't enable join reordering in general because reordering without cardinality estimates is gambling * Trust the query written by the user as we don't have cardinality estimates. And we don't need to add a switch to bring inconvenience to the user. What do you think? To get a better performance, I think making the join parallel is a good idea. I will take an investigate on it. Thanks a lot for your suggestions. Best, Hequn > Add stream-stream non-window cross join > --- > > Key: FLINK-11070 > URL: https://issues.apache.org/jira/browse/FLINK-11070 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Currently, we don't reorder join and rely on the order provided by the user. > This is fine for most of the cases, however, it limits the set of supported > SQL queries. > Example: > {code:java} > val streamUtil: StreamTableTestUtil = streamTestUtil() > streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > val sqlQuery = > """ > |SELECT t1.a, t3.b > |FROM MyTable3 t3, MyTable2 t2, MyTable t1 > |WHERE t1.a = t3.a AND t1.a = t2.a > |""".stripMargin > streamUtil.printSql(sqlQuery) > {code} > Given the current rule sets, this query produces a cross join which is not > supported and thus leads to: > {code:java} > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > LogicalProject(a=[$8], b=[$1]) > LogicalFilter(condition=[AND(=($8, $0), =($8, $4))]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalTableScan(table=[[_DataStreamTable_2]]) > LogicalTableScan(table=[[_DataStreamTable_1]]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > {code} > In order to support more queries, it would be nice to have cross join on > streaming. We can start from a simple version, for example, call > forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross > join. The performance may be bad. But it works fine if the two tables of > cross join are small ones. > We can do some optimizations later, such as broadcasting the smaller side, > etc. > Any suggestions are greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11070) Add stream-stream non-window cross join
[ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715884#comment-16715884 ] Hequn Cheng edited comment on FLINK-11070 at 12/11/18 1:06 AM: --- [~fhueske] Hi, thanks for the feedback and sharing the thoughts. As for the example, I think I should add more details. * Table2 and table3 are very small tables, such as a table with only one row. And table1 is a very big table, such as with 1M rows. * All these rows contain same join keys. So the cost for cross join and non-cross join would be 1*1+1*1M(cross join) VS 1*1M+1M*1(non-cross join). This means the query may be executed with a cross join with a better performance. Adding a switch is a good idea. This force our users to pay more attention to the performance of cross join. But it may also bring some inconvenience. We can't remove the switch even Flink support join reordering. Because there is a chance the cardinality estimates have not been passed by the user. So once we can't get the cardinality estimates, the user has to configure the switch to enable a cross join if he does want to use cross join. From this point of view, I think we should not have the switch. I would propose that: * Don't enable join reordering in general because reordering without cardinality estimates is gambling * Trust the query written by the user as we don't have cardinality estimates. And we don't need to add a switch to bring inconvenience to the user. What do you think? To get a better performance, I think making the join parallel is a good idea. I will take an investigate on it. Thanks a lot for your suggestions. Best, Hequn was (Author: hequn8128): [~fhueske] Hi, thanks for the feedback and sharing the thoughts. As for the example, I think I should add more details. * Table2 and table3 are very small tables, such as a table with only one row. And table1 is a very big table, such as with 1M rows. * All these rows contain same join keys. So the cost for cross join and non-cross join would be 1*1*1M(cross join) VS 1*1M*1M(non-cross join). This means the query may be executed with a cross join with a much better performance. Adding a switch is a good idea. This force our users to pay more attention to the performance of cross join. But it may also bring some inconvenience. We can't remove the switch even Flink support join reordering. Because there is a chance the cardinality estimates have not been passed by the user. So once we can't get the cardinality estimates, the user has to configure the switch to enable a cross join if he does want to use cross join. From this point of view, I think we should not have the switch. I would propose that: * Don't enable join reordering in general because reordering without cardinality estimates is gambling * Trust the query written by the user as we don't have cardinality estimates. And we don't need to add a switch to bring inconvenience to the user. What do you think? To get a better performance, I think making the join parallel is a good idea. I will take an investigate on it. Thanks a lot for your suggestions. Best, Hequn > Add stream-stream non-window cross join > --- > > Key: FLINK-11070 > URL: https://issues.apache.org/jira/browse/FLINK-11070 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Currently, we don't reorder join and rely on the order provided by the user. > This is fine for most of the cases, however, it limits the set of supported > SQL queries. > Example: > {code:java} > val streamUtil: StreamTableTestUtil = streamTestUtil() > streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, > 'proctime.proctime) > val sqlQuery = > """ > |SELECT t1.a, t3.b > |FROM MyTable3 t3, MyTable2 t2, MyTable t1 > |WHERE t1.a = t3.a AND t1.a = t2.a > |""".stripMargin > streamUtil.printSql(sqlQuery) > {code} > Given the current rule sets, this query produces a cross join which is not > supported and thus leads to: > {code:java} > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > LogicalProject(a=[$8], b=[$1]) > LogicalFilter(condition=[AND(=($8, $0), =($8, $4))]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalTableScan(table=[[_DataStreamTable_2]]) > LogicalTableScan(table=[[_DataStreamTable_1]]) >
[jira] [Updated] (FLINK-11123) Improve ml quick start doc
[ https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11123: --- Labels: pull-request-available (was: ) > Improve ml quick start doc > -- > > Key: FLINK-11123 > URL: https://issues.apache.org/jira/browse/FLINK-11123 > Project: Flink > Issue Type: Improvement > Components: Documentation, Machine Learning Library >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0, 1.7.1 > > > The user cannot run the sample through the ml quick launch document because > the import description of the class is missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11123) Improve ml quick start doc
[ https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715871#comment-16715871 ] ASF GitHub Bot commented on FLINK-11123: sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic… URL: https://github.com/apache/flink/pull/7269 ## What is the purpose of the change *This PR fix the import of the class is missing in ml quick start document.* ## Brief change log - *Add `import org.apache.flink.ml.math.Vector` in ml quick start doc.* ## Verifying this change This change is a documentation improvement without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve ml quick start doc > -- > > Key: FLINK-11123 > URL: https://issues.apache.org/jira/browse/FLINK-11123 > Project: Flink > Issue Type: Improvement > Components: Documentation, Machine Learning Library >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0, 1.7.1 > > > The user cannot run the sample through the ml quick launch document because > the import description of the class is missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic…
sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic… URL: https://github.com/apache/flink/pull/7269 ## What is the purpose of the change *This PR fix the import of the class is missing in ml quick start document.* ## Brief change log - *Add `import org.apache.flink.ml.math.Vector` in ml quick start doc.* ## Verifying this change This change is a documentation improvement without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11123) Improve ml quick start doc
sunjincheng created FLINK-11123: --- Summary: Improve ml quick start doc Key: FLINK-11123 URL: https://issues.apache.org/jira/browse/FLINK-11123 Project: Flink Issue Type: Improvement Components: Documentation, Machine Learning Library Affects Versions: 1.7.0 Reporter: sunjincheng Assignee: sunjincheng Fix For: 1.7.1, 1.7.0 The user cannot run the sample through the ml quick launch document because the import description of the class is missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API
sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#issuecomment-446002606 Thanks for the update! @dianfu LGTM. +1 to merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715703#comment-16715703 ] ASF GitHub Bot commented on FLINK-10974: sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API URL: https://github.com/apache/flink/pull/7196#issuecomment-446002606 Thanks for the update! @dianfu LGTM. +1 to merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization
[ https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715635#comment-16715635 ] Fabian Hueske commented on FLINK-11039: --- Hi [~ambition], I still don't understand the purpose of this issue. Can you share a query that fails due to missing support of {{LogicalExchange}}? Thank you, Fabian > LogicalExchange and HashPartitioner realization > --- > > Key: FLINK-11039 > URL: https://issues.apache.org/jira/browse/FLINK-11039 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function > realization. > FlinkLogicalExchange realization > org.apache.calcite.rel.logical.LogicalExchange. > HashPartitioner is Partitioner that implements hash-based partitioning using > Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type > HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715279#comment-16715279 ] ASF GitHub Bot commented on FLINK-10945: azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#discussion_r240313527 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -726,6 +728,56 @@ void sendPartitionInfos() { } } + /** +* Check whether the InputDependencyConstraint is satisfied for this vertex. +* +* @return whether the input constraint is satisfied +*/ + public boolean checkInputDependencyConstraints() { Review comment: This might be a bit more concise: ``` boolean checkInputDependencyConstraints() { if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) { // InputDependencyConstraint == ANY return jobVertex.getInputs().stream().anyMatch(this::isInputConsumable); } else { // InputDependencyConstraint == ALL return jobVertex.getInputs().stream().allMatch(this::isInputConsumable); } } boolean isInputConsumable(IntermediateResult result) { if (result.getResultType().isPipelined()) { // For PIPELINED result, the input is consumable if any result partition has produced records or is finished return Arrays.stream(result.getPartitions()).anyMatch(IntermediateResultPartition::hasDataProduced); } else { // For BLOCKING result, the input is consumable if all the partitions in the result are finished return result.areAllPartitionsFinished(); } } ``` I am not sure we need to check the `ANY` case at all. Just checking this theoretically changes current behaviour. On the other hand, at the moment, I think it is always true for `ANY` where we check it if `ScheduleMode.LAZY_FROM_SOURCES`. I am also not sure that `ALL` config makes sense together with `ScheduleMode.EAGER`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid resource deadlocks for finite stream jobs when resources are limited > -- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.1 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. > [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend
[ https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715298#comment-16715298 ] ASF GitHub Bot commented on FLINK-11074: walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240322027 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Integer => JInt} +import java.util.concurrent.ConcurrentLinkedQueue + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.scala._ +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.dataview.{DataView, MapView} +import org.apache.flink.table.dataview.StateMapView +import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, GroupAggProcessFunction} +import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.junit.Assert.assertTrue +import org.junit.Test + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class AggFunctionHarnessTest extends HarnessTestBase { + private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), Time.seconds(0)) + + @Test + def testCollectAggregate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val data = new mutable.MutableList[(JInt, String)] +val t = env.fromCollection(data).toTable(tEnv, 'a, 'b) +tEnv.registerTable("T", t) +val sqlQuery = tEnv.sqlQuery( + s""" + |SELECT + | b, collect(a) + |FROM ( + | SELECT a, b + | FROM T + | GROUP BY a, b + |) GROUP BY b + |""".stripMargin) + +val testHarness = createHarnessTester[String, CRow, CRow]( + sqlQuery.toRetractStream[Row](queryConfig), "groupBy") + +testHarness.setStateBackend(getStateBackend) +testHarness.open() + +val operator = getOperator(testHarness) +val state = getState(operator, "acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]] +assertTrue(state.isInstanceOf[StateMapView[_, _]]) + assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]]) + +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1)) +expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 1).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 2).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 1).asJava), 1)) + +// remove some state: state may be cleaned up by the state backend if not accessed more than ttl +operator.setCurrentKey(Row.of("aaa")) +state.remove(2) + +// retract after state has been cleaned up +testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 1)) + +val result =
[GitHub] walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction
walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240321103 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -491,13 +489,75 @@ class HarnessTestBase { distinctCountFuncName, distinctCountAggCode) + def createHarnessTester[KEY, IN, OUT]( + dataStream: DataStream[_], + prefixOperatorName: String) + : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = { + +val transformation = extractExpectedTransformation( + dataStream.javaStream.getTransformation, + prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]] +if (transformation == null) { + throw new Exception("Can not find the expected transformation") +} + +val processOperator = transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]] +val keySelector = transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]] +val keyType = transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]] + +createHarnessTester(processOperator, keySelector, keyType) + .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]] + } + + private def extractExpectedTransformation( + transformation: StreamTransformation[_], + prefixOperatorName: String): StreamTransformation[_] = { +def extractFromInputs(inputs: StreamTransformation[_]*): StreamTransformation[_] = { + for (input <- inputs) { +val t = extractExpectedTransformation(input, prefixOperatorName) +if (t != null) { + return t +} + } + null +} + +transformation match { + case one: OneInputTransformation[_, _] => +if (one.getName.startsWith(prefixOperatorName)) { + one +} else { + extractExpectedTransformation(one.getInput, prefixOperatorName) +} + case two: TwoInputTransformation[_, _, _] => Review comment: Let's throw unsupported operation for now, since there's no code path that executes two input transform yet. we can always add this logic later when necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend
[ https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715299#comment-16715299 ] ASF GitHub Bot commented on FLINK-11074: walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240321103 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -491,13 +489,75 @@ class HarnessTestBase { distinctCountFuncName, distinctCountAggCode) + def createHarnessTester[KEY, IN, OUT]( + dataStream: DataStream[_], + prefixOperatorName: String) + : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = { + +val transformation = extractExpectedTransformation( + dataStream.javaStream.getTransformation, + prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]] +if (transformation == null) { + throw new Exception("Can not find the expected transformation") +} + +val processOperator = transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]] +val keySelector = transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]] +val keyType = transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]] + +createHarnessTester(processOperator, keySelector, keyType) + .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]] + } + + private def extractExpectedTransformation( + transformation: StreamTransformation[_], + prefixOperatorName: String): StreamTransformation[_] = { +def extractFromInputs(inputs: StreamTransformation[_]*): StreamTransformation[_] = { + for (input <- inputs) { +val t = extractExpectedTransformation(input, prefixOperatorName) +if (t != null) { + return t +} + } + null +} + +transformation match { + case one: OneInputTransformation[_, _] => +if (one.getName.startsWith(prefixOperatorName)) { + one +} else { + extractExpectedTransformation(one.getInput, prefixOperatorName) +} + case two: TwoInputTransformation[_, _, _] => Review comment: Let's throw unsupported operation for now, since there's no code path that executes two input transform yet. we can always add this logic later when necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve the harness test to make it possible test with state backend > > > Key: FLINK-11074 > URL: https://issues.apache.org/jira/browse/FLINK-11074 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Currently, the harness test can only test without state backend. If you use a > DataView in the accumulator of the aggregate function, the DataView is a java > object and held in heap, not replaced with StateMapView/StateListView which > values are actually held in the state backend. We should improve the harness > test to make it possible to test with state backend. Otherwise, issues such > as FLINK-10674 could have never been found. With this harness test available, > we could test the built-in aggregate functions which use the DataView more > fine grained. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction
walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#discussion_r240322027 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Integer => JInt} +import java.util.concurrent.ConcurrentLinkedQueue + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.scala._ +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.dataview.{DataView, MapView} +import org.apache.flink.table.dataview.StateMapView +import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, GroupAggProcessFunction} +import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.junit.Assert.assertTrue +import org.junit.Test + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class AggFunctionHarnessTest extends HarnessTestBase { + private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), Time.seconds(0)) + + @Test + def testCollectAggregate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val data = new mutable.MutableList[(JInt, String)] +val t = env.fromCollection(data).toTable(tEnv, 'a, 'b) +tEnv.registerTable("T", t) +val sqlQuery = tEnv.sqlQuery( + s""" + |SELECT + | b, collect(a) + |FROM ( + | SELECT a, b + | FROM T + | GROUP BY a, b + |) GROUP BY b + |""".stripMargin) + +val testHarness = createHarnessTester[String, CRow, CRow]( + sqlQuery.toRetractStream[Row](queryConfig), "groupBy") + +testHarness.setStateBackend(getStateBackend) +testHarness.open() + +val operator = getOperator(testHarness) +val state = getState(operator, "acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]] +assertTrue(state.isInstanceOf[StateMapView[_, _]]) + assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]]) + +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1)) +expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 1).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1)) + +testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1)) +expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 2).asJava), 1)) +expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 1).asJava), 1)) + +// remove some state: state may be cleaned up by the state backend if not accessed more than ttl +operator.setCurrentKey(Row.of("aaa")) +state.remove(2) + +// retract after state has been cleaned up +testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 1)) + +val result = testHarness.getOutput + +verify(expectedOutput, result) + +testHarness.close() + } + + private def getState( Review comment: This can probably be put into `HarnessTestBase` as well. As of now I can only image the Operator to