[GitHub] kisimple commented on a change in pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable

2018-12-10 Thread GitBox
kisimple commented on a change in pull request #7267: [FLINK-11083][Table] 
CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267#discussion_r240500235
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
 ##
 @@ -29,6 +41,72 @@ class CRowSerializerTest extends TestLogger {
   @Test
   def testDefaultConstructor(): Unit = {
 new CRowSerializer.CRowSerializerConfigSnapshot()
+
+val serializerConfigSnapshotClass = Class.forName(
+  
"org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
 
 Review comment:
   Thanks for the review. Have updated the PR to address your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10359) Scala example in DataSet docs is broken

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716378#comment-16716378
 ] 

ASF GitHub Bot commented on FLINK-10359:


tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs 
is broken
URL: https://github.com/apache/flink/pull/7266#issuecomment-446095615
 
 
   Thanks for the clarification. Merging this ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Scala example in DataSet docs is broken
> ---
>
> Key: FLINK-10359
> URL: https://issues.apache.org/jira/browse/FLINK-10359
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Fabian Hueske
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The Scala example of 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions]
>  is broken.
> The {{asScala}} and the {{reduce}} call fetch the Java {{Iterator}} which may 
> only fetched once.
> {quote}The Iterable can be iterated over only once. Only the first call to 
> 'iterator()' will succeed.{quote}
> While we are on it, it would make sense to check the other examples as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-10 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716377#comment-16716377
 ] 

sunjincheng commented on FLINK-10543:
-

Revert in release-1.7: d3966332d4fe37fd22f92c758a6a34097ed16ac6

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken

2018-12-10 Thread GitBox
tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs 
is broken
URL: https://github.com/apache/flink/pull/7266#issuecomment-446095615
 
 
   Thanks for the clarification. Merging this ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11029) Incorrect parameter in Working with state doc

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716376#comment-16716376
 ] 

ASF GitHub Bot commented on FLINK-11029:


tzulitai commented on issue #7198: [FLINK-11029][docs] fixed the Incorrect 
parameter in Working with state doc
URL: https://github.com/apache/flink/pull/7198#issuecomment-446094754
 
 
   Thanks, +1, LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Incorrect parameter in Working with state doc
> -
>
> Key: FLINK-11029
> URL: https://issues.apache.org/jira/browse/FLINK-11029
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Minor
>  Labels: pull-request-available
>
> In [Working with 
> State|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state],
>  the parameter of getAggregatingState() method is incorrect.
> It should be AggregatingStateDescriptor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on issue #7198: [FLINK-11029][docs] fixed the Incorrect parameter in Working with state doc

2018-12-10 Thread GitBox
tzulitai commented on issue #7198: [FLINK-11029][docs] fixed the Incorrect 
parameter in Working with state doc
URL: https://github.com/apache/flink/pull/7198#issuecomment-446094754
 
 
   Thanks, +1, LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10359) Scala example in DataSet docs is broken

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716367#comment-16716367
 ] 

ASF GitHub Bot commented on FLINK-10359:


yanghua commented on issue #7266: [FLINK-10359] Scala example in DataSet docs 
is broken
URL: https://github.com/apache/flink/pull/7266#issuecomment-446094214
 
 
   @tzulitai yes, I searched but it seems there is no issue like this in other 
places.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Scala example in DataSet docs is broken
> ---
>
> Key: FLINK-10359
> URL: https://issues.apache.org/jira/browse/FLINK-10359
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Fabian Hueske
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The Scala example of 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions]
>  is broken.
> The {{asScala}} and the {{reduce}} call fetch the Java {{Iterator}} which may 
> only fetched once.
> {quote}The Iterable can be iterated over only once. Only the first call to 
> 'iterator()' will succeed.{quote}
> While we are on it, it would make sense to check the other examples as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken

2018-12-10 Thread GitBox
yanghua commented on issue #7266: [FLINK-10359] Scala example in DataSet docs 
is broken
URL: https://github.com/apache/flink/pull/7266#issuecomment-446094214
 
 
   @tzulitai yes, I searched but it seems there is no issue like this in other 
places.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #7219: [hotfix][docs] Correct the parameter type in Operators Overview doc

2018-12-10 Thread GitBox
tzulitai commented on issue #7219: [hotfix][docs] Correct the parameter type in 
Operators Overview doc
URL: https://github.com/apache/flink/pull/7219#issuecomment-446093969
 
 
   Good catch, merging ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10359) Scala example in DataSet docs is broken

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716361#comment-16716361
 ] 

ASF GitHub Bot commented on FLINK-10359:


tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs 
is broken
URL: https://github.com/apache/flink/pull/7266#issuecomment-446093688
 
 
   Thanks @yanghua.
   Did you also check other Scala example snippets for the same issue?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Scala example in DataSet docs is broken
> ---
>
> Key: FLINK-10359
> URL: https://issues.apache.org/jira/browse/FLINK-10359
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Fabian Hueske
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The Scala example of 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions]
>  is broken.
> The {{asScala}} and the {{reduce}} call fetch the Java {{Iterator}} which may 
> only fetched once.
> {quote}The Iterable can be iterated over only once. Only the first call to 
> 'iterator()' will succeed.{quote}
> While we are on it, it would make sense to check the other examples as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs is broken

2018-12-10 Thread GitBox
tzulitai commented on issue #7266: [FLINK-10359] Scala example in DataSet docs 
is broken
URL: https://github.com/apache/flink/pull/7266#issuecomment-446093688
 
 
   Thanks @yanghua.
   Did you also check other Scala example snippets for the same issue?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716359#comment-16716359
 ] 

ASF GitHub Bot commented on FLINK-11083:


tzulitai commented on a change in pull request #7267: [FLINK-11083][Table] 
CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267#discussion_r240489572
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
 ##
 @@ -29,6 +41,72 @@ class CRowSerializerTest extends TestLogger {
   @Test
   def testDefaultConstructor(): Unit = {
 new CRowSerializer.CRowSerializerConfigSnapshot()
+
+val serializerConfigSnapshotClass = Class.forName(
+  
"org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
 
 Review comment:
   Can you use `classOf[...]` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CRowSerializerConfigSnapshot is not instantiable
> 
>
> Key: FLINK-11083
> URL: https://issues.apache.org/jira/browse/FLINK-11083
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL, Type Serialization System
>Reporter: boshu Zheng
>Assignee: boshu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> An exception was encountered when restarting a job with savepoint in our 
> production env,
> {code:java}
> 2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task 
>   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
> to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>   ... 5 more
> Caused by: java.lang.RuntimeException: The class 
> 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
>  is not instantiable: The class has no (implicit) public nullary constructor, 
> i.e. a constructor without arguments.
>   at 
> org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
>   at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> 

[GitHub] tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs

2018-12-10 Thread GitBox
tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs
URL: https://github.com/apache/flink/pull/7254#issuecomment-446092899
 
 
   Please remember, for future contributions, that the commit message should be 
appropriate.
   For example, the title you used for the PR would be better than the current 
"fix tEnv in tableApi docs".
   
   The commit messages should be tagged also.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs

2018-12-10 Thread GitBox
tzulitai commented on issue #7254: [hotfix][tableApi] fix tEnv in tableApi docs
URL: https://github.com/apache/flink/pull/7254#issuecomment-446092512
 
 
   LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable

2018-12-10 Thread GitBox
tzulitai commented on a change in pull request #7267: [FLINK-11083][Table] 
CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267#discussion_r240489572
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
 ##
 @@ -29,6 +41,72 @@ class CRowSerializerTest extends TestLogger {
   @Test
   def testDefaultConstructor(): Unit = {
 new CRowSerializer.CRowSerializerConfigSnapshot()
+
+val serializerConfigSnapshotClass = Class.forName(
+  
"org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
 
 Review comment:
   Can you use `classOf[...]` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #7270: [hotfix][docs] Fix typo in Joining documentation

2018-12-10 Thread GitBox
tzulitai commented on issue #7270: [hotfix][docs] Fix typo in Joining 
documentation
URL: https://github.com/apache/flink/pull/7270#issuecomment-446090988
 
 
   LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10724) Refactor failure handling in check point coordinator

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716352#comment-16716352
 ] 

vinoyang commented on FLINK-10724:
--

[~yunta] From the source code point of view {{abortSubsumed}}, the current 
checkpoint subsumed is treated as a "failure" way (especially when it can be 
subsumed), but should it be regarded as a failure when counting, I think it 
might need to be discussed.

> Refactor failure handling in check point coordinator
> 
>
> Key: FLINK-10724
> URL: https://issues.apache.org/jira/browse/FLINK-10724
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>
> At the moment failure handling of asynchronously triggered checkpoint in 
> check point coordinator happens in different places. We could organise it 
> similar way as failure handling of synchronous triggering of checkpoint in 
> *CheckpointTriggerResult* where we classify error cases. This will simplify 
> e.g. integration of error counter for FLINK-4810.
> See also discussion here: [https://github.com/apache/flink/pull/6567]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10964) sql-client throws exception when paging through finished batch query

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716340#comment-16716340
 ] 

vinoyang edited comment on FLINK-10964 at 12/11/18 6:36 AM:


[~twalthr] when you have time, can you help to review the PR of this issue? By 
analyzing the source code, I found that the ResultStore has been emptied at the 
time of cancelQuery. The main problem with this exception is that when stopping 
the retrieval thread, it still tries to call cancelQuery regardless of the 
running state of the job.


was (Author: yanghua):
[~twalthr] when you have time, can you help to review the PR of this issue? By 
analyzing the source code, I found that the ResultRestore has been emptied at 
the time of cancelQuery. The main problem with this exception is that when 
stopping the retrieval thread, it still tries to call cancelQuery regardless of 
the running state of the job.

> sql-client throws exception when paging through finished batch query 
> -
>
> Key: FLINK-10964
> URL: https://issues.apache.org/jira/browse/FLINK-10964
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: Seth Wiesman
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> When paging through a batch query in state 'Finished' the sql client throws 
> the following exception: 
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a 
> result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10964) sql-client throws exception when paging through finished batch query

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716340#comment-16716340
 ] 

vinoyang commented on FLINK-10964:
--

[~twalthr] when you have time, can you help to review the PR of this issue? By 
analyzing the source code, I found that the ResultRestore has been emptied at 
the time of cancelQuery. The main problem with this exception is that when 
stopping the retrieval thread, it still tries to call cancelQuery regardless of 
the running state of the job.

> sql-client throws exception when paging through finished batch query 
> -
>
> Key: FLINK-10964
> URL: https://issues.apache.org/jira/browse/FLINK-10964
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: Seth Wiesman
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> When paging through a batch query in state 'Finished' the sql client throws 
> the following exception: 
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a 
> result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11118) Refactor and unify rowtime timestamp extractor interface

2018-12-10 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-8:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-7)

> Refactor and unify rowtime timestamp extractor interface
> 
>
> Key: FLINK-8
> URL: https://issues.apache.org/jira/browse/FLINK-8
> Project: Flink
>  Issue Type: Task
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11117) Migrate connector classes to flink-table-spi

2018-12-10 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed FLINK-7.

Resolution: Duplicate

> Migrate connector classes to flink-table-spi
> 
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> This issue covers the fifth step of the migration plan mentioned in 
> [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free].
> Once we implemented improvements to the [unified connector 
> interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  we can also migrate the classes. Among others, it requires a refactoring of 
> the timestamp extractors which are the biggest blockers because they 
> transitively depending on expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11117) Migrate connector classes to flink-table-spi

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716333#comment-16716333
 ] 

vinoyang commented on FLINK-7:
--

[~twalthr] oh, sorry, I will close it.

> Migrate connector classes to flink-table-spi
> 
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> This issue covers the fifth step of the migration plan mentioned in 
> [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free].
> Once we implemented improvements to the [unified connector 
> interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  we can also migrate the classes. Among others, it requires a refactoring of 
> the timestamp extractors which are the biggest blockers because they 
> transitively depending on expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11117) Migrate connector classes to flink-table-spi

2018-12-10 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716330#comment-16716330
 ] 

Timo Walther commented on FLINK-7:
--

[~yanghua] This issue is a duplicate of FLINK-10688.

> Migrate connector classes to flink-table-spi
> 
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> This issue covers the fifth step of the migration plan mentioned in 
> [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free].
> Once we implemented improvements to the [unified connector 
> interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  we can also migrate the classes. Among others, it requires a refactoring of 
> the timestamp extractors which are the biggest blockers because they 
> transitively depending on expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716328#comment-16716328
 ] 

ASF GitHub Bot commented on FLINK-10774:


stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-446086480
 
 
   @tillrohrmann Thanks a lot for the feedbacks. I made the changes according 
to your comments. please take a look and see if I miss anything


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> connection leak when partition discovery is disabled and open throws exception
> --
>
> Key: FLINK-10774
> URL: https://issues.apache.org/jira/browse/FLINK-10774
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2, 1.5.5, 1.6.2
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>
> Here is the scenario to reproduce the issue
>  * partition discovery is disabled
>  * open method throws an exception (e.g. when broker SSL authorization denies 
> request)
> In this scenario, run method won't be executed. As a result, 
> _partitionDiscoverer.close()_ won't be called. that caused the connection 
> leak, because KafkaConsumer is initialized but not closed. That has caused 
> outage that brought down our Kafka cluster, when a high-parallelism job got 
> into a restart/failure loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716331#comment-16716331
 ] 

vinoyang commented on FLINK-11081:
--

[~walterddr]  Thank you for recognizing the value of this issue. In fact, its 
implementation emulates the traditional way Flink supports port ranges, such as 
"taskmanager.rpc.port". It allows to specify:
 * Single port (eg 8081, 0 means randomly picked by jm)
 * The port list (12345, 1234) will be selected in order in this case.
 * Port range (12345-12346), in this case will also be selected in order

The specific implementation verifies the validity of the port and detects 
conflict exception.

> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2018-12-10 Thread GitBox
stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-446086480
 
 
   @tillrohrmann Thanks a lot for the feedbacks. I made the changes according 
to your comments. please take a look and see if I miss anything


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716320#comment-16716320
 ] 

ASF GitHub Bot commented on FLINK-11010:


lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240484482
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   yeah, it's better to validate hour. I'll update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9699) Add api to replace registered table

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716286#comment-16716286
 ] 

ASF GitHub Bot commented on FLINK-9699:
---

zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace 
registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-446082836
 
 
   @hequn8128 @yanghua PR is updated, could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add api to replace registered table
> ---
>
> Key: FLINK-9699
> URL: https://issues.apache.org/jira/browse/FLINK-9699
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240484482
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   yeah, it's better to validate hour. I'll update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11125) Remove useless import

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716317#comment-16716317
 ] 

ASF GitHub Bot commented on FLINK-11125:


hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import
URL: https://github.com/apache/flink/pull/7272
 
 
   
   ## What is the purpose of the change
   
   This pull request removes some unused imports in tests and flink-table.
   
   
   ## Brief change log
   
 - Remove unused import
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove useless import 
> --
>
> Key: FLINK-11125
> URL: https://issues.apache.org/jira/browse/FLINK-11125
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL, Tests
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11125) Remove useless import

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11125:
---
Labels: pull-request-available  (was: )

> Remove useless import 
> --
>
> Key: FLINK-11125
> URL: https://issues.apache.org/jira/browse/FLINK-11125
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL, Tests
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import

2018-12-10 Thread GitBox
hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import
URL: https://github.com/apache/flink/pull/7272
 
 
   
   ## What is the purpose of the change
   
   This pull request removes some unused imports in tests and flink-table.
   
   
   ## Brief change log
   
 - Remove unused import
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table

2018-12-10 Thread GitBox
zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace 
registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-446082836
 
 
   @hequn8128 @yanghua PR is updated, could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11060) Unable to set number of task manager and slot per task manager in scala shell local mode

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716285#comment-16716285
 ] 

ASF GitHub Bot commented on FLINK-11060:


zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set 
number of task manager and slot per task manager in scala shell local mode
URL: https://github.com/apache/flink/pull/7229#issuecomment-446082679
 
 
   @yanghua @tillrohrmann Could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unable to set number of task manager and slot per task manager in scala shell 
> local mode
> 
>
> Key: FLINK-11060
> URL: https://issues.apache.org/jira/browse/FLINK-11060
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.7.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> In scala-shell, I can not change the number of task manager and slot per task 
> manager, it is hard coded to 1. I can not specify them in flink-conf.yaml



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set number of task manager and slot per task manager in scala shell local mode

2018-12-10 Thread GitBox
zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set 
number of task manager and slot per task manager in scala shell local mode
URL: https://github.com/apache/flink/pull/7229#issuecomment-446082679
 
 
   @yanghua @tillrohrmann Could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11125) Remove useless import

2018-12-10 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716274#comment-16716274
 ] 

TisonKun commented on FLINK-11125:
--

@[~hequn8128] If the change is not big, it might be satisfied in a hotfix. 
Otherwise it's better to detail the title as "Remove useless import under 
XXXClass" or something.

> Remove useless import 
> --
>
> Key: FLINK-11125
> URL: https://issues.apache.org/jira/browse/FLINK-11125
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL, Tests
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11039) LogicalExchange and HashPartitioner realization

2018-12-10 Thread ambition (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ambition closed FLINK-11039.

Resolution: Fixed

> LogicalExchange and HashPartitioner realization
> ---
>
> Key: FLINK-11039
> URL: https://issues.apache.org/jira/browse/FLINK-11039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: ambition
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function 
> realization.
> FlinkLogicalExchange realization 
> org.apache.calcite.rel.logical.LogicalExchange.
> HashPartitioner is Partitioner that implements hash-based partitioning using 
> Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type 
> HASH_DISTRIBUTED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ambition119 closed pull request #7202: [FLINK-11039] LogicalExchange and HashPartitioner realization

2018-12-10 Thread GitBox
ambition119 closed pull request #7202: [FLINK-11039]  LogicalExchange and 
HashPartitioner realization
URL: https://github.com/apache/flink/pull/7202
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716238#comment-16716238
 ] 

ASF GitHub Bot commented on FLINK-11039:


ambition119 closed pull request #7202: [FLINK-11039]  LogicalExchange and 
HashPartitioner realization
URL: https://github.com/apache/flink/pull/7202
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LogicalExchange and HashPartitioner realization
> ---
>
> Key: FLINK-11039
> URL: https://issues.apache.org/jira/browse/FLINK-11039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: ambition
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function 
> realization.
> FlinkLogicalExchange realization 
> org.apache.calcite.rel.logical.LogicalExchange.
> HashPartitioner is Partitioner that implements hash-based partitioning using 
> Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type 
> HASH_DISTRIBUTED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716215#comment-16716215
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r240476864
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   @azagrebin @zentol Okay. Yeah. Then we don't need a barrier. The semaphore 
would be sufficient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-12-10 Thread GitBox
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r240476864
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   @azagrebin @zentol Okay. Yeah. Then we don't need a barrier. The semaphore 
would be sufficient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11126) Filter out AMRMToken in the TaskManager credentials

2018-12-10 Thread Paul Lin (JIRA)
Paul Lin created FLINK-11126:


 Summary: Filter out AMRMToken in the TaskManager credentials
 Key: FLINK-11126
 URL: https://issues.apache.org/jira/browse/FLINK-11126
 Project: Flink
  Issue Type: Improvement
  Components: Security, YARN
Affects Versions: 1.7.0, 1.6.2
Reporter: Paul Lin
Assignee: Paul Lin


Currently, Flink JobManager propagates its storage tokens to TaskManager to 
meet the requirement of YARN log aggregation (see FLINK-6376). But in this way 
the AMRMToken is also included in the TaskManager credentials, which could be 
potentially insecure. We should filter out AMRMToken before setting the tokens 
to TaskManager's container launch context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-10 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716173#comment-16716173
 ] 

Rong Rong commented on FLINK-11081:
---

+1 to this feature. Very helpful actually for lots of our use cases. just quick 
higher level questions, 1) what is the binding strategy of the port range to 
the actual port number?  (smallest? random?); and 2) should we introduce a 
range limit to maximum range size?

> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716171#comment-16716171
 ] 

ASF GitHub Bot commented on FLINK-4810:
---

ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334#issuecomment-446070175
 
 
   @azagrebin - Thanks for the ping. Currently am not working on this. Pls feel 
free to work on this or the related JIRA FLINK-10074. I would add myself as a 
watcher to understand more about it. Thanks once again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful 
> checkpoints
> 
>
> Key: FLINK-4810
> URL: https://issues.apache.org/jira/browse/FLINK-4810
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> The Checkpoint coordinator should track the number of consecutive 
> unsuccessful checkpoints.
> If more than {{n}} (configured value) checkpoints fail in a row, it should 
> call {{fail()}} on the execution graph to trigger a recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-10 Thread GitBox
ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334#issuecomment-446070175
 
 
   @azagrebin - Thanks for the ping. Currently am not working on this. Pls feel 
free to work on this or the related JIRA FLINK-10074. I would add myself as a 
watcher to understand more about it. Thanks once again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts

2018-12-10 Thread Paul Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Lin closed FLINK-10943.

Resolution: Not A Bug

> Flink runtime test failed caused by curator dependency conflicts
> 
>
> Key: FLINK-10943
> URL: https://issues.apache.org/jira/browse/FLINK-10943
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Tests
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt
>
>
> Hadoop-common of 2.6 + version includes curator dependencies, which would 
> have conflicts with the curator used by Flink runtime and cause test failures 
> (the attachment is the surefire report). 
> The curator dependencies tree of flink runtime is as below:
> ```
> flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes
> flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client
> ```
> According to the dependency mechanism, maven would pick the curator-client in 
> flink-shaded-hadoop2, and curator-framework and curator-recipes from 
> flink-shaded-curator.
> To fix the problem I think we can simply exclude curator-client from 
> flink-shaded-hadoop2 dependency in flink-runtime.
> I'd like to fix this, please let me know what you think. Thanks!
> [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716164#comment-16716164
 ] 

ASF GitHub Bot commented on FLINK-11010:


walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240470904
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   This causes me some trouble because cases can happen when they cross the 
second boundary. this is not a stable ITCase in my opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240470904
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   This causes me some trouble because cases can happen when they cross the 
second boundary. this is not a stable ITCase in my opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716158#comment-16716158
 ] 

ASF GitHub Bot commented on FLINK-11048:


tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470721
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+   
List jarFiles,
 
 Review comment:
   It seems to leave room for arbitrary review demands also!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread GitBox
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470721
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+   
List jarFiles,
 
 Review comment:
   It seems to leave room for arbitrary review demands also!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-11123.
---
   Resolution: Fixed
Fix Version/s: 1.6.3

Fixed in master: 156c09ced9a61d20e2e5d4ce5cfedab8ac4d4ee4
Fixed in release-1.7: f045dfd501343bc9c72c665a6c599e42a221bd67
Fixed in release-1.6: d1a489f9eda733d914a885cd593b26f336a4d380

> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.1, 1.7.0
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716154#comment-16716154
 ] 

ASF GitHub Bot commented on FLINK-11048:


tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470550
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Agreed on the public method contract. I rearranged the code to keep the 
existing method as is.
   
   The setter is an alternative option for users that already have the remote 
environment instance (instead of having to deal with an 8 parameter static 
method). I would like to keep this as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread GitBox
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470550
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Agreed on the public method contract. I rearranged the code to keep the 
existing method as is.
   
   The setter is an alternative option for users that already have the remote 
environment instance (instead of having to deal with an 8 parameter static 
method). I would like to keep this as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11124:
---
Labels: pull-request-available  (was: )

> Add private[flink] to TemporalTableFunction.create()
> 
>
> Key: FLINK-11124
> URL: https://issues.apache.org/jira/browse/FLINK-11124
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> {{TemporalTableFunction}} is an user-oriented class. I think it would be 
> better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} 
> method in order to make it invisible to users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add private[flink] to TemporalTableFunction.create() method

2018-12-10 Thread GitBox
hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add 
private[flink] to TemporalTableFunction.create() method
URL: https://github.com/apache/flink/pull/7271
 
 
   
   ## What is the purpose of the change
   
   This pull request adds `private[flink]` to `TemporalTableFunction.create()` 
method. As `TemporalTableFunction` is an user-oriented class. I think it would 
be better to add `private[flink]` to the `TemporalTableFunction.create()` 
method in order to make it invisible to users.
   
   
   ## Brief change log
   
 - Add private[flink] to TemporalTableFunction.create()
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716152#comment-16716152
 ] 

ASF GitHub Bot commented on FLINK-11124:


hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add 
private[flink] to TemporalTableFunction.create() method
URL: https://github.com/apache/flink/pull/7271
 
 
   
   ## What is the purpose of the change
   
   This pull request adds `private[flink]` to `TemporalTableFunction.create()` 
method. As `TemporalTableFunction` is an user-oriented class. I think it would 
be better to add `private[flink]` to the `TemporalTableFunction.create()` 
method in order to make it invisible to users.
   
   
   ## Brief change log
   
 - Add private[flink] to TemporalTableFunction.create()
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add private[flink] to TemporalTableFunction.create()
> 
>
> Key: FLINK-11124
> URL: https://issues.apache.org/jira/browse/FLINK-11124
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> {{TemporalTableFunction}} is an user-oriented class. I think it would be 
> better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} 
> method in order to make it invisible to users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts

2018-12-10 Thread GitBox
link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index d216851c2d1..0cab4feabda 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -72,6 +72,12 @@ under the License.
flink-shaded-hadoop2
${project.version}
true
+   
+   
+   org.apache.curator
+   curator-client
+   
+   

 



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716150#comment-16716150
 ] 

ASF GitHub Bot commented on FLINK-10943:


link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index d216851c2d1..0cab4feabda 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -72,6 +72,12 @@ under the License.
flink-shaded-hadoop2
${project.version}
true
+   
+   
+   org.apache.curator
+   curator-client
+   
+   

 



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink runtime test failed caused by curator dependency conflicts
> 
>
> Key: FLINK-10943
> URL: https://issues.apache.org/jira/browse/FLINK-10943
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Tests
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt
>
>
> Hadoop-common of 2.6 + version includes curator dependencies, which would 
> have conflicts with the curator used by Flink runtime and cause test failures 
> (the attachment is the surefire report). 
> The curator dependencies tree of flink runtime is as below:
> ```
> flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes
> flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client
> ```
> According to the dependency mechanism, maven would pick the curator-client in 
> flink-shaded-hadoop2, and curator-framework and curator-recipes from 
> flink-shaded-curator.
> To fix the problem I think we can simply exclude curator-client from 
> flink-shaded-hadoop2 dependency in flink-runtime.
> I'd like to fix this, please let me know what you think. Thanks!
> [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716149#comment-16716149
 ] 

ASF GitHub Bot commented on FLINK-10943:


link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148#issuecomment-446068042
 
 
   @tillrohrmann Thanks for your review. You're right, it's caused by the 
relocation problem of IntelliJ. I'll close the PR and the Jira ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink runtime test failed caused by curator dependency conflicts
> 
>
> Key: FLINK-10943
> URL: https://issues.apache.org/jira/browse/FLINK-10943
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Tests
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt
>
>
> Hadoop-common of 2.6 + version includes curator dependencies, which would 
> have conflicts with the curator used by Flink runtime and cause test failures 
> (the attachment is the surefire report). 
> The curator dependencies tree of flink runtime is as below:
> ```
> flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes
> flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client
> ```
> According to the dependency mechanism, maven would pick the curator-client in 
> flink-shaded-hadoop2, and curator-framework and curator-recipes from 
> flink-shaded-curator.
> To fix the problem I think we can simply exclude curator-client from 
> flink-shaded-hadoop2 dependency in flink-runtime.
> I'd like to fix this, please let me know what you think. Thanks!
> [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts

2018-12-10 Thread GitBox
link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148#issuecomment-446068042
 
 
   @tillrohrmann Thanks for your review. You're right, it's caused by the 
relocation problem of IntelliJ. I'll close the PR and the Jira ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11090) Unused parameter in WindowedStream.aggregate()

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716146#comment-16716146
 ] 

Hequn Cheng commented on FLINK-11090:
-

Hi [~aljoscha] thanks for telling the differences between \{{PublicEvolving}} 
and \{{Public}}. It would be nice to remove the useless parameter directly. I 
will give a pr soon. 

Thanks, Hequn

> Unused parameter in WindowedStream.aggregate()
> --
>
> Key: FLINK-11090
> URL: https://issues.apache.org/jira/browse/FLINK-11090
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> The {{aggregateResultType}} parameter in {{WindowedStream.aggregate()}} seems 
> useless. Or what have I missed?
> If it is useless, I prefer to remove the parameter by adding a new API and 
> deprecate the current one. We can't remove it directly as it is 
> PublicEvolving.
> {code:java}
>   @PublicEvolving
>   public  SingleOutputStreamOperator aggregate(
>   AggregateFunction aggregateFunction,
>   ProcessWindowFunction windowFunction,
>   TypeInformation accumulatorType,
>   TypeInformation aggregateResultType,
>   TypeInformation resultType) {
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11125) Remove useless import

2018-12-10 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11125:
---

 Summary: Remove useless import 
 Key: FLINK-11125
 URL: https://issues.apache.org/jira/browse/FLINK-11125
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL, Tests
Reporter: Hequn Cheng
Assignee: Hequn Cheng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716125#comment-16716125
 ] 

ASF GitHub Bot commented on FLINK-11123:


asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the 
class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md
index e056b28b505..2e9a7b9505c 100644
--- a/docs/dev/libs/ml/quickstart.md
+++ b/docs/dev/libs/ml/quickstart.md
@@ -153,6 +153,8 @@ A conversion can be done using a simple normalizer mapping 
function:
  
 {% highlight scala %}
 
+import org.apache.flink.ml.math.Vector
+
 def normalizer : LabeledVector => LabeledVector = { 
 lv => LabeledVector(if (lv.label > 0.0) 1.0 else -1.0, lv.vector)
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.7.1
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic…

2018-12-10 Thread GitBox
asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the 
class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md
index e056b28b505..2e9a7b9505c 100644
--- a/docs/dev/libs/ml/quickstart.md
+++ b/docs/dev/libs/ml/quickstart.md
@@ -153,6 +153,8 @@ A conversion can be done using a simple normalizer mapping 
function:
  
 {% highlight scala %}
 
+import org.apache.flink.ml.math.Vector
+
 def normalizer : LabeledVector => LabeledVector = { 
 lv => LabeledVector(if (lv.label > 0.0) 1.0 else -1.0, lv.vector)
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716116#comment-16716116
 ] 

Hequn Cheng commented on FLINK-10543:
-

[~fhueske] Hi, would be nice if you can take a look at the PR.
The pr mainly include the following changes:
 * Delete expired timers in {{ProcessFunctionWithCleanupState}}.
 * Add {{CoProcessFunctionWithCleanupState}} for join. This can align the 
cleanup logic in join with other operators.
 * Use one {{ValueState}} to control clean up instead of two, i.e, take left 
and right state of join as a whole.
 * Leverage min and max retention time to clean up the state in join while 
before, join regist clean up timer and clean up the state in a fixed interval. 
In the new version, we can remove all records once in all as new records will 
refresh the timer. 

Best, Hequn

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-10 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716101#comment-16716101
 ] 

sunjincheng commented on FLINK-10543:
-

Hi [~fhueske] from the point of compatibility I agree this change should not cp 
to release-1.7.  I'll revert the change from release-1.7.

Thank you for reminding we only back port bug fixes to release branches that do 
not break compatibility.

I think the improvement is necessary on the master to reduce the storage and 
access pressure on useless states.

If you find there are any incorrect semantics changes, we can file new JIRA. to 
fix.  

Thanks, Jincheng

 

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zzchun opened a new pull request #7270: [hotfix][docs] Fix typo in Joining documentation

2018-12-10 Thread GitBox
zzchun opened a new pull request #7270: [hotfix][docs] Fix typo in Joining 
documentation
URL: https://github.com/apache/flink/pull/7270
 
 
   ## What is the purpose of the change
   
   Fix typo in Joining documentation
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()

2018-12-10 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11124:
---

 Summary: Add private[flink] to TemporalTableFunction.create()
 Key: FLINK-11124
 URL: https://issues.apache.org/jira/browse/FLINK-11124
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


{{TemporalTableFunction}} is an user-oriented class. I think it would be better 
to add {{private[flink]}} to the {{TemporalTableFunction.create()}} method in 
order to make it invisible to users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716027#comment-16716027
 ] 

Fabian Hueske commented on FLINK-10543:
---

Hi [~sunjincheng121], [~hequn8128], I had a quick look at the PR and it seems 
that the clean up behavior of some operators was changed.

1. Some {{ValueState}} were removed from join operators, which breaks savepoint 
compatibility.
2. The clean up behavior of joins was changed. It seems that all records are 
removed in case of a firing clean up timer while before, non-expired records 
were kept.

We only backport bug fixes to release branches that do not break compatibility 
(unless it is absolutely necessary).
Hence, the commit on the release-1.7 branch must be reverted.

I think we should also discuss the changes on the master branch. 
This issue was about leveraging the new timer deletion feature to improve the 
performance and not changing the clean up semantics of the operators. 
I'll have a closer look at the changes in the next days to figure out what 
exactly changed.

Thanks, Fabian

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-11010:
---
Affects Version/s: 1.8.0

> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-11010:
---
Affects Version/s: 1.7.1
   1.7.0

> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716020#comment-16716020
 ] 

ASF GitHub Bot commented on FLINK-11010:


lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446050898
 
 
   hi, @zentol cc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446050898
 
 
   hi, @zentol cc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716015#comment-16716015
 ] 

Hequn Cheng commented on FLINK-11070:
-

[~fhueske] Thanks for your quick reply! 
Ok, I got your point. It also makes sense to notify users about the potential 
performance risk. Once we support join reordering, we can remove the switches.

Best, Hequn

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on 
> streaming. We can start from a simple version, for example, call 
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross 
> join. The performance may be bad. But it works fine if the two tables of 
> cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, 
> etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715954#comment-16715954
 ] 

Fabian Hueske commented on FLINK-11070:
---

Hmmm, I'm hesitant to enable cross joins by default.
It is very easy to write a query that performs unnecessary cross joins. I would 
not trust users with that.
If we run cross joins by default, users will experience very bad performance 
even the query could be rewritten without cross joins. 
With a switch in the table config we can notify the user about a (potentially) 
inefficient query and users can manually enable these queries.

I don't really see a problem removing the switch in the future. We can change 
the default behavior to enable cross joins by default once we have support for 
join reordering. Even if the cardinality estimates are not perfect, equi joins 
should be much cheaper than cross joins.

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on 
> streaming. We can start from a simple version, for example, call 
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross 
> join. The performance may be bad. But it works fine if the two tables of 
> cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, 
> etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is 
inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446041717
 
 
   i got the same problem too:
   val format = new SimpleDateFormat("-MM-dd HH:mm:ssZ")
   val origin: DataStream[TransactionEvent] = env.fromCollection(List(
 TransactionEvent("u1", format.parse("2018-01-02 01:13:30+0800"), 10)
   ))
   val source2 = origin
 .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[TransactionEvent](Time.minutes(1)){
   override def extractTimestamp(element: TransactionEvent): Long = {
 val timestamp = element.time.getTime
 println(s"extractTimestamp:$timestamp")
 timestamp
   }
 })
   tEnv.fromDataStream(source2, 'user,'eventTime.rowtime)
 .toAppendStream[Row].print()
   
   I got eventTime as 2018-01-01 17:13:30.0, which is 8hours delayed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715949#comment-16715949
 ] 

ASF GitHub Bot commented on FLINK-11010:


samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is 
inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446041717
 
 
   i got the same problem too:
   val format = new SimpleDateFormat("-MM-dd HH:mm:ssZ")
   val origin: DataStream[TransactionEvent] = env.fromCollection(List(
 TransactionEvent("u1", format.parse("2018-01-02 01:13:30+0800"), 10)
   ))
   val source2 = origin
 .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[TransactionEvent](Time.minutes(1)){
   override def extractTimestamp(element: TransactionEvent): Long = {
 val timestamp = element.time.getTime
 println(s"extractTimestamp:$timestamp")
 timestamp
   }
 })
   tEnv.fromDataStream(source2, 'user,'eventTime.rowtime)
 .toAppendStream[Row].print()
   
   I got eventTime as 2018-01-01 17:13:30.0, which is 8hours delayed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715937#comment-16715937
 ] 

Fabian Hueske commented on FLINK-10566:
---

Hmmm, true. 
The stack trace was taken during a recursive sink-to-source plan traversal to 
register the serializers.
The plan resulting from the given program branches a lot.
I guess the problem is that the traversal does not check if a subplan was 
already traversed. 

> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Robert Bradshaw
>Priority: Major
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet input = env.fromElements("a", "b", "c");
> DataSet stats = null;
> for (int i = 0; i < depth; i++) {
>   stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet analyze(DataSet input, 
> DataSet stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
>   final int ii = i;
>   if (stats != null) {
> input = input.map(new RichMapFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
>   Collection broadcastSet = 
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
>   return value;
> }
>   }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
>   }
>   DataSet branch = input
>.map(s -> new Tuple2(0, s + 
> ii))
>.groupBy(0)
>.minBy(1)
>.map(kv -> kv.f1);
>   if (stats == null) {
> stats = branch;
>   } else {
> stats = stats.union(branch);
>   }
> }
> return stats.map(s -> "(" + s + ").stats");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715932#comment-16715932
 ] 

ASF GitHub Bot commented on FLINK-11074:


dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445274
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = 

[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715934#comment-16715934
 ] 

ASF GitHub Bot commented on FLINK-11074:


dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445604
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715933#comment-16715933
 ] 

ASF GitHub Bot commented on FLINK-11074:


dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445308
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -87,7 +85,7 @@ class HarnessTestBase {
 new 
RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)):
 _*)
 
   protected val distinctCountDescriptor: String = 
EncodingUtils.encodeObjectToString(
-new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, 
Types.LONG))
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445308
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -87,7 +85,7 @@ class HarnessTestBase {
 new 
RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)):
 _*)
 
   protected val distinctCountDescriptor: String = 
EncodingUtils.encodeObjectToString(
-new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, 
Types.LONG))
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445274
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = testHarness.getOutput
+
+verify(expectedOutput, result)
+
+testHarness.close()
+  }
+
+  private def getState(
 
 Review comment:
   Make sense. Done.


This is an automated 

[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445604
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11089) Log filecache directory removed messages

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715910#comment-16715910
 ] 

ASF GitHub Bot commented on FLINK-11089:


liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory 
removed messages
URL: https://github.com/apache/flink/pull/7257#issuecomment-446037623
 
 
   @tillrohrmann Hi,are you available to help me to review this PR? Thanks! 
Look forwards to your reply.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Log filecache directory removed messages 
> -
>
> Key: FLINK-11089
> URL: https://issues.apache.org/jira/browse/FLINK-11089
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.7.0
>Reporter: liuzhaokun
>Priority: Minor
>  Labels: pull-request-available
>
> When taskmanager exit or shutdown,the filecache directory named 
> "flink-dist-cache*" will be removed,but there is not any log about this 
> action.So I think we should log it for user to check it easy when there are 
> some bugs.
> You can see IOManager.java logs the removed messages when taskmanager 
> shutdown, filecache can do the same things.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory removed messages

2018-12-10 Thread GitBox
liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory 
removed messages
URL: https://github.com/apache/flink/pull/7257#issuecomment-446037623
 
 
   @tillrohrmann Hi,are you available to help me to review this PR? Thanks! 
Look forwards to your reply.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715884#comment-16715884
 ] 

Hequn Cheng commented on FLINK-11070:
-

[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. 
And table1 is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1*1M(cross join) VS 
1*1M*1M(non-cross join). This means the query may be executed with a cross join 
with a much better performance.

Adding a switch is a good idea. This force our users to pay more attention to 
the performance of cross join. But it may also bring some inconvenience. We 
can't remove the switch even Flink support join reordering. Because there is a 
chance the cardinality estimates have not been passed by the user. So once we 
can't get the cardinality estimates, the user has to configure the switch to 
enable a cross join if he does want to use cross join. From this point of view, 
I think we should not have the switch. 
I would propose that:
 * Don't enable join reordering in general because reordering without 
cardinality estimates is gambling
 * Trust the query written by the user as we don't have cardinality estimates. 
And we don't need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I 
will take an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on 
> streaming. We can start from a simple version, for example, call 
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross 
> join. The performance may be bad. But it works fine if the two tables of 
> cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, 
> etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715884#comment-16715884
 ] 

Hequn Cheng edited comment on FLINK-11070 at 12/11/18 1:06 AM:
---

[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. 
And table1 is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1+1*1M(cross join) VS 
1*1M+1M*1(non-cross join). This means the query may be executed with a cross 
join with a better performance.

Adding a switch is a good idea. This force our users to pay more attention to 
the performance of cross join. But it may also bring some inconvenience. We 
can't remove the switch even Flink support join reordering. Because there is a 
chance the cardinality estimates have not been passed by the user. So once we 
can't get the cardinality estimates, the user has to configure the switch to 
enable a cross join if he does want to use cross join. From this point of view, 
I think we should not have the switch. 
 I would propose that:
 * Don't enable join reordering in general because reordering without 
cardinality estimates is gambling
 * Trust the query written by the user as we don't have cardinality estimates. 
And we don't need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I 
will take an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn


was (Author: hequn8128):
[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. 
And table1 is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1*1M(cross join) VS 
1*1M*1M(non-cross join). This means the query may be executed with a cross join 
with a much better performance.

Adding a switch is a good idea. This force our users to pay more attention to 
the performance of cross join. But it may also bring some inconvenience. We 
can't remove the switch even Flink support join reordering. Because there is a 
chance the cardinality estimates have not been passed by the user. So once we 
can't get the cardinality estimates, the user has to configure the switch to 
enable a cross join if he does want to use cross join. From this point of view, 
I think we should not have the switch. 
I would propose that:
 * Don't enable join reordering in general because reordering without 
cardinality estimates is gambling
 * Trust the query written by the user as we don't have cardinality estimates. 
And we don't need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I 
will take an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   

[jira] [Updated] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11123:
---
Labels: pull-request-available  (was: )

> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.7.1
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715871#comment-16715871
 ] 

ASF GitHub Bot commented on FLINK-11123:


sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the 
import of the class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   ## What is the purpose of the change
   
   *This PR fix the import of the class is missing in ml quick start document.*
   
   
   ## Brief change log
 - *Add `import org.apache.flink.ml.math.Vector` in ml quick start doc.*
   
   ## Verifying this change
   This change is a documentation improvement without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.7.1
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic…

2018-12-10 Thread GitBox
sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the 
import of the class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   ## What is the purpose of the change
   
   *This PR fix the import of the class is missing in ml quick start document.*
   
   
   ## Brief change log
 - *Add `import org.apache.flink.ml.math.Vector` in ml quick start doc.*
   
   ## Verifying this change
   This change is a documentation improvement without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread sunjincheng (JIRA)
sunjincheng created FLINK-11123:
---

 Summary: Improve ml quick start doc
 Key: FLINK-11123
 URL: https://issues.apache.org/jira/browse/FLINK-11123
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Machine Learning Library
Affects Versions: 1.7.0
Reporter: sunjincheng
Assignee: sunjincheng
 Fix For: 1.7.1, 1.7.0


The user cannot run the sample through the ml quick launch document because the 
import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-10 Thread GitBox
sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196#issuecomment-446002606
 
 
   Thanks for the update! @dianfu 
   LGTM. +1 to merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715703#comment-16715703
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196#issuecomment-446002606
 
 
   Thanks for the update! @dianfu 
   LGTM. +1 to merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715635#comment-16715635
 ] 

Fabian Hueske commented on FLINK-11039:
---

Hi [~ambition],

I still don't understand the purpose of this issue. 
Can you share a query that fails due to missing support of {{LogicalExchange}}?

Thank you,
Fabian

> LogicalExchange and HashPartitioner realization
> ---
>
> Key: FLINK-11039
> URL: https://issues.apache.org/jira/browse/FLINK-11039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: ambition
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function 
> realization.
> FlinkLogicalExchange realization 
> org.apache.calcite.rel.logical.LogicalExchange.
> HashPartitioner is Partitioner that implements hash-based partitioning using 
> Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type 
> HASH_DISTRIBUTED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715279#comment-16715279
 ] 

ASF GitHub Bot commented on FLINK-10945:


azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240313527
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +728,56 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
 
 Review comment:
   This might be a bit more concise:
   ```
  boolean checkInputDependencyConstraints() {
if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
// InputDependencyConstraint == ANY
return 
jobVertex.getInputs().stream().anyMatch(this::isInputConsumable);
} else {
// InputDependencyConstraint == ALL
return 
jobVertex.getInputs().stream().allMatch(this::isInputConsumable);
}
}
   
boolean isInputConsumable(IntermediateResult result) {
if (result.getResultType().isPipelined()) {
// For PIPELINED result, the input is consumable if any 
result partition has produced records or is finished
return 
Arrays.stream(result.getPartitions()).anyMatch(IntermediateResultPartition::hasDataProduced);
} else {
// For BLOCKING result, the input is consumable if all 
the partitions in the result are finished
return result.areAllPartitionsFinished();
}
}
   ```
   I am not sure we need to check the `ANY` case at all. Just checking this 
theoretically changes current behaviour. On the other hand, at the moment, I 
think it is always true for `ANY` where we check it if 
`ScheduleMode.LAZY_FROM_SOURCES`. 
   
   I am also not sure that `ALL` config makes sense together with 
`ScheduleMode.EAGER`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715298#comment-16715298
 ] 

ASF GitHub Bot commented on FLINK-11074:


walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240322027
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = 

[GitHub] walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240321103
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Let's throw unsupported operation for now, since there's no code path that 
executes two input transform yet. we can always add this logic later when 
necessary. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715299#comment-16715299
 ] 

ASF GitHub Bot commented on FLINK-11074:


walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240321103
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Let's throw unsupported operation for now, since there's no code path that 
executes two input transform yet. we can always add this logic later when 
necessary. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240322027
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = testHarness.getOutput
+
+verify(expectedOutput, result)
+
+testHarness.close()
+  }
+
+  private def getState(
 
 Review comment:
   This can probably be put into `HarnessTestBase` as well. As of now I can 
only image the Operator to 

  1   2   3   >