[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880059#comment-15880059
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102654252
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Waiting on `numPendingRequests` makes sense, I'll try and see if it works 
out.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3358: [FLINK-5487] [elasticsearch] At-least-once Elastic...

2017-02-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102654252
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Waiting on `numPendingRequests` makes sense, I'll try and see if it works 
out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880057#comment-15880057
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102654126
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Ah, I see the problem here ...
The bulk processor's internal `bulkRequest.numberOfActions() == 0` will 
become `true` as soon as it starts executing the flush, and not after 
`afterBulk` is invoked.

So, since our `numPendingRequests` implementation relies on the `afterBulk` 
callback, we might have busy loops on `bulkProcessor.flush()` while we wait for 
`numPendingRequests` to become 0.

This is quite a nice catch actually! So no worries on bringing it up now.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3358: [FLINK-5487] [elasticsearch] At-least-once Elastic...

2017-02-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102654126
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Ah, I see the problem here ...
The bulk processor's internal `bulkRequest.numberOfActions() == 0` will 
become `true` as soon as it starts executing the flush, and not after 
`afterBulk` is invoked.

So, since our `numPendingRequests` implementation relies on the `afterBulk` 
callback, we might have busy loops on `bulkProcessor.flush()` while we wait for 
`numPendingRequests` to become 0.

This is quite a nice catch actually! So no worries on bringing it up now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5893) Race condition in removing previous JobManagerRegistration in ResourceManager

2017-02-22 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-5893:

Description: 
The map of {{JobManagerRegistration}} in ResourceManager is not thread-safe, 
and currently there may be two threads to operate the map concurrently to bring 
unexpected results.

The scenario is like this :

 - {{registerJobManager}}: When the job leader changes and the new JobManager 
leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
replace the old one in the map with the same key {{JobID}}. This process is 
triggered by rpc thread.

 - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
job leader change and trigger the action {{jobLeaderLostLeadership}} in another 
thread. In this action, it will remove the previous {{JobManagerRegistration}} 
from the map by {{JobID}}, but the old {{JobManagerRegistration}} may be 
already replaced by the new one from {{registerJobManager}}.

In summary, this race condition may cause the new {{JobManagerRegistration}} 
removed from ResourceManager, resulting in exception when request slot from 
ResourceManager. It can occur in small probability when running JobManager 
failure ITCase.

Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for the 
map.

  was:
The map of {{JobManagerRegistration}} in ResourceManager is not thread-safe, 
and currently there may be two threads to operate the map concurrently to bring 
unexpected results.

The scenario is like this :

 - {{registerJobManager}}: When the job leader changes and the new JobManager 
leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
replace the old one in the map with the same key {{JobID}}. This process is 
triggered by rpc thread.

 - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
job leader change and trigger the action {{jobLeaderLostLeadership}} in another 
thread. In this action, it will remove the previous {{JobManagerRegistration}} 
from the map by {{JobID}}, but the old {{JobManagerRegistration}} may be 
already replaced by the new one from {{registerJobManager}}.

In summary, this race condition may cause the new {{JobManagerRegistration}} 
removed from ResourceManager, resulting in exception when request slot from 
ResourceManager.

Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for the 
map.


> Race condition in removing previous JobManagerRegistration in ResourceManager
> -
>
> Key: FLINK-5893
> URL: https://issues.apache.org/jira/browse/FLINK-5893
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Reporter: zhijiang
>Assignee: zhijiang
>
> The map of {{JobManagerRegistration}} in ResourceManager is not thread-safe, 
> and currently there may be two threads to operate the map concurrently to 
> bring unexpected results.
> The scenario is like this :
>  - {{registerJobManager}}: When the job leader changes and the new JobManager 
> leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
> replace the old one in the map with the same key {{JobID}}. This process is 
> triggered by rpc thread.
>  - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
> job leader change and trigger the action {{jobLeaderLostLeadership}} in 
> another thread. In this action, it will remove the previous 
> {{JobManagerRegistration}} from the map by {{JobID}}, but the old 
> {{JobManagerRegistration}} may be already replaced by the new one from 
> {{registerJobManager}}.
> In summary, this race condition may cause the new {{JobManagerRegistration}} 
> removed from ResourceManager, resulting in exception when request slot from 
> ResourceManager. It can occur in small probability when running JobManager 
> failure ITCase.
> Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
> scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for 
> the map.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652877
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -52,9 +53,14 @@ class IncrementalAggregateReduceFunction(
 //   and directly merge value1 and value2.
 val accumulatorRow = new Row(intermediateRowArity)
 
-// copy all fields of value1 into accumulatorRow
-(0 until intermediateRowArity)
-.foreach(i => accumulatorRow.setField(i, value1.getField(i)))
+// copy non agg fields of value2 into accumulatorRow
+(0 until aggOffset)
+  .foreach(i => accumulatorRow.setField(i, value2.getField(i)))
+
+// copy agg fields of value1 into accumulatorRow
--- End diff --

IMO, groupWindow's non-agg fields is group fields, and whether copy from 
value1 or value2 is the same, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880052#comment-15880052
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652936
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalEventTimeRowWindowAssigner.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that assigns all elements to the same global 
row window.
+ *
+ */
+@PublicEvolving
+public class GlobalEventTimeRowWindowAssigner extends 
WindowAssigner {
--- End diff --

This windowassigner is aim to keep the currentelement eventtime and current 
max timestamp, if new element timestamp is before the current max timestamp, i 
will just discard it.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880051#comment-15880051
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652926
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
--- End diff --

The testSlideEventTimeUnboundWindowWithGroup has include non-agg field.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652936
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalEventTimeRowWindowAssigner.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that assigns all elements to the same global 
row window.
+ *
+ */
+@PublicEvolving
+public class GlobalEventTimeRowWindowAssigner extends 
WindowAssigner {
--- End diff --

This windowassigner is aim to keep the currentelement eventtime and current 
max timestamp, if new element timestamp is before the current max timestamp, i 
will just discard it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880050#comment-15880050
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652914
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
--- End diff --

Oh yes, i will remove it.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must b

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880049#comment-15880049
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652877
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -52,9 +53,14 @@ class IncrementalAggregateReduceFunction(
 //   and directly merge value1 and value2.
 val accumulatorRow = new Row(intermediateRowArity)
 
-// copy all fields of value1 into accumulatorRow
-(0 until intermediateRowArity)
-.foreach(i => accumulatorRow.setField(i, value1.getField(i)))
+// copy non agg fields of value2 into accumulatorRow
+(0 until aggOffset)
+  .foreach(i => accumulatorRow.setField(i, value2.getField(i)))
+
+// copy agg fields of value1 into accumulatorRow
--- End diff --

IMO, groupWindow's non-agg fields is group fields, and whether copy from 
value1 or value2 is the same, what do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652926
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
--- End diff --

The testSlideEventTimeUnboundWindowWithGroup has include non-agg field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102652914
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
--- End diff --

Oh yes, i will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5893) Race condition in removing previous JobManagerRegistration in ResourceManager

2017-02-22 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reassigned FLINK-5893:
---

Assignee: zhijiang

> Race condition in removing previous JobManagerRegistration in ResourceManager
> -
>
> Key: FLINK-5893
> URL: https://issues.apache.org/jira/browse/FLINK-5893
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Reporter: zhijiang
>Assignee: zhijiang
>
> The map of {{JobManagerRegistration}} in ResourceManager is not thread-safe, 
> and currently there may be two threads to operate the map concurrently to 
> bring unexpected results.
> The scenario is like this :
>  - {{registerJobManager}}: When the job leader changes and the new JobManager 
> leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
> replace the old one in the map with the same key {{JobID}}. This process is 
> triggered by rpc thread.
>  - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
> job leader change and trigger the action {{jobLeaderLostLeadership}} in 
> another thread. In this action, it will remove the previous 
> {{JobManagerRegistration}} from the map by {{JobID}}, but the old 
> {{JobManagerRegistration}} may be already replaced by the new one from 
> {{registerJobManager}}.
> In summary, this race condition may cause the new {{JobManagerRegistration}} 
> removed from ResourceManager, resulting in exception when request slot from 
> ResourceManager.
> Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
> scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for 
> the map.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5893) Race condition in removing previous JobManagerRegistration in ResourceManager

2017-02-22 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-5893:

Description: 
The map of {{JobManagerRegistration}} in ResourceManager is not thread-safe, 
and currently there may be two threads to operate the map concurrently to bring 
unexpected results.

The scenario is like this :

 - {{registerJobManager}}: When the job leader changes and the new JobManager 
leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
replace the old one in the map with the same key {{JobID}}. This process is 
triggered by rpc thread.

 - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
job leader change and trigger the action {{jobLeaderLostLeadership}} in another 
thread. In this action, it will remove the previous {{JobManagerRegistration}} 
from the map by {{JobID}}, but the old {{JobManagerRegistration}} may be 
already replaced by the new one from {{registerJobManager}}.

In summary, this race condition may cause the new {{JobManagerRegistration}} 
removed from ResourceManager, resulting in exception when request slot from 
ResourceManager.

Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for the 
map.

  was:
The map of {{JobManagerRegistration}} in {{ResourceManager}} is not 
thread-safe, and currently there may be two threads to operate the map 
concurrently to bring unexpected results.

The scenario is like this :

 - {{registerJobManager}}: When the job leader changes and the new JobManager 
leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
replace the old one in the map with the same key {{JobID}}. This process is 
triggered by rpc thread.

 - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
job leader change and trigger the action {{jobLeaderLostLeadership}} in another 
thread. In this action, it will remove the previous {{JobManagerRegistration}} 
from the map by {{JobID}}, but the old {{JobManagerRegistration}} may be 
already replaced by the new one from {{registerJobManager}}.

In summary, this race condition may cause the new {{JobManagerRegistration}} 
removed from ResourceManager, resulting in exception when request slot from 
ResourceManager.

Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for the 
map.


> Race condition in removing previous JobManagerRegistration in ResourceManager
> -
>
> Key: FLINK-5893
> URL: https://issues.apache.org/jira/browse/FLINK-5893
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Reporter: zhijiang
>
> The map of {{JobManagerRegistration}} in ResourceManager is not thread-safe, 
> and currently there may be two threads to operate the map concurrently to 
> bring unexpected results.
> The scenario is like this :
>  - {{registerJobManager}}: When the job leader changes and the new JobManager 
> leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
> replace the old one in the map with the same key {{JobID}}. This process is 
> triggered by rpc thread.
>  - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
> job leader change and trigger the action {{jobLeaderLostLeadership}} in 
> another thread. In this action, it will remove the previous 
> {{JobManagerRegistration}} from the map by {{JobID}}, but the old 
> {{JobManagerRegistration}} may be already replaced by the new one from 
> {{registerJobManager}}.
> In summary, this race condition may cause the new {{JobManagerRegistration}} 
> removed from ResourceManager, resulting in exception when request slot from 
> ResourceManager.
> Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
> scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for 
> the map.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5893) Race condition in removing previous JobManagerRegistration in ResourceManager

2017-02-22 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-5893:

Description: 
The map of {{JobManagerRegistration}} in {{ResourceManager}} is not 
thread-safe, and currently there may be two threads to operate the map 
concurrently to bring unexpected results.

The scenario is like this :

 - {{registerJobManager}}: When the job leader changes and the new JobManager 
leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
replace the old one in the map with the same key {{JobID}}. This process is 
triggered by rpc thread.

 - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
job leader change and trigger the action {{jobLeaderLostLeadership}} in another 
thread. In this action, it will remove the previous {{JobManagerRegistration}} 
from the map by {{JobID}}, but the old {{JobManagerRegistration}} may be 
already replaced by the new one from {{registerJobManager}}.

In summary, this race condition may cause the new {{JobManagerRegistration}} 
removed from ResourceManager, resulting in exception when request slot from 
ResourceManager.

Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for the 
map.

  was:
The map of {{JobManagerRegistration}} in {{ResourceManager}} is not 
thread-safe, and currently there may be two threads to operate the map 
concurrently to bring unexpected results.

The scenario is like this :

{{registerJobManager}}: When the job leader changes and the new JobManager 
leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
replace the old one in the map with the same key {{JobID}}. This process is 
triggered by rpc thread.

Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of job 
leader change and trigger the action {{jobLeaderLostLeadership}} in another 
thread. In this action, it will remove the previous {{JobManagerRegistration}} 
from the map by {{JobID}}, but the old {{JobManagerRegistration}} may be 
already replaced by the new one from {{registerJobManager}}.

In summary, this race condition may cause the new {{JobManagerRegistration}} 
removed from ResourceManager, resulting in exception when request slot from 
ResourceManager.

Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for the 
map.


> Race condition in removing previous JobManagerRegistration in ResourceManager
> -
>
> Key: FLINK-5893
> URL: https://issues.apache.org/jira/browse/FLINK-5893
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Reporter: zhijiang
>
> The map of {{JobManagerRegistration}} in {{ResourceManager}} is not 
> thread-safe, and currently there may be two threads to operate the map 
> concurrently to bring unexpected results.
> The scenario is like this :
>  - {{registerJobManager}}: When the job leader changes and the new JobManager 
> leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
> replace the old one in the map with the same key {{JobID}}. This process is 
> triggered by rpc thread.
>  - Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of 
> job leader change and trigger the action {{jobLeaderLostLeadership}} in 
> another thread. In this action, it will remove the previous 
> {{JobManagerRegistration}} from the map by {{JobID}}, but the old 
> {{JobManagerRegistration}} may be already replaced by the new one from 
> {{registerJobManager}}.
> In summary, this race condition may cause the new {{JobManagerRegistration}} 
> removed from ResourceManager, resulting in exception when request slot from 
> ResourceManager.
> Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
> scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for 
> the map.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5893) Race condition in removing previous JobManagerRegistration in ResourceManager

2017-02-22 Thread zhijiang (JIRA)
zhijiang created FLINK-5893:
---

 Summary: Race condition in removing previous 
JobManagerRegistration in ResourceManager
 Key: FLINK-5893
 URL: https://issues.apache.org/jira/browse/FLINK-5893
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Reporter: zhijiang


The map of {{JobManagerRegistration}} in {{ResourceManager}} is not 
thread-safe, and currently there may be two threads to operate the map 
concurrently to bring unexpected results.

The scenario is like this :

{{registerJobManager}}: When the job leader changes and the new JobManager 
leader registers to ResourceManager, the new {{JobManagerRegistration}} will 
replace the old one in the map with the same key {{JobID}}. This process is 
triggered by rpc thread.

Meanwhile, the {{JobLeaderIdService}} in ResourceManager could be aware of job 
leader change and trigger the action {{jobLeaderLostLeadership}} in another 
thread. In this action, it will remove the previous {{JobManagerRegistration}} 
from the map by {{JobID}}, but the old {{JobManagerRegistration}} may be 
already replaced by the new one from {{registerJobManager}}.

In summary, this race condition may cause the new {{JobManagerRegistration}} 
removed from ResourceManager, resulting in exception when request slot from 
ResourceManager.

Consider the solution of this issue, the {{jobLeaderLostLeadership}} can be 
scheduled by {{runAsync}} in rpc thread and no need to bring extra lock for the 
map.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5892) Recover job state at the granularity of operator

2017-02-22 Thread MaGuowei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MaGuowei updated FLINK-5892:

Description: 
JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
granularity of task.
This leads to the result that the operator of the job may not recover the state 
from a save point even if the save point has the state of operator. 


 
https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.

  was:
JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
granularity of task.
This leads to the result that the operator of the job may not recover the state 
from a save point even if the save point has the state of operator. 



> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: MaGuowei
>Assignee: MaGuowei
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5892) Recover job state at the granularity of operator

2017-02-22 Thread MaGuowei (JIRA)
MaGuowei created FLINK-5892:
---

 Summary: Recover job state at the granularity of operator
 Key: FLINK-5892
 URL: https://issues.apache.org/jira/browse/FLINK-5892
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Reporter: MaGuowei
Assignee: MaGuowei


JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
granularity of task.
This leads to the result that the operator of the job may not recover the state 
from a save point even if the save point has the state of operator. 




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880012#comment-15880012
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102650085
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
+val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getR

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102650085
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
+val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+val inputFields = new Array[Int](inputType.getFieldCount)
--- End diff --

Yes, it's much better, i will update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as wel

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880004#comment-15880004
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102649307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
--- End diff --

Thanks very much for the review.
1. The design of check whether eventtime is base on 
https://issues.apache.org/jira/browse/FLINK-5624 which support rowtime() as a 
built-in function like proctime() in FLINK-570, i think we can do in this way 
to support overwindow development current period, and modify once the issue 
done, what do you think?
2. I check whether the current data is out of order in WindowOperator 
isLate function, and now just discard if islate.
3. Yes, we should distinguish partitionkey and groupkey, i will fix it.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (n

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102649307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
--- End diff --

Thanks very much for the review.
1. The design of check whether eventtime is base on 
https://issues.apache.org/jira/browse/FLINK-5624 which support rowtime() as a 
built-in function like proctime() in FLINK-570, i think we can do in this way 
to support overwindow development current period, and modify once the issue 
done, what do you think?
2. I check whether the current data is out of order in WindowOperator 
isLate function, and now just discard if islate.
3. Yes, we should distinguish partitionkey and groupkey, i will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879996#comment-15879996
 ] 

ASF GitHub Bot commented on FLINK-5546:
---

Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/3190
  
Can we just use the `${project.build.directory}` as `java.io.tmpdir` ?


> java.io.tmpdir setted as project build directory in surefire plugin
> ---
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
> Fix For: 1.2.1
>
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-02-22 Thread wenlong88
Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/3190
  
Can we just use the `${project.build.directory}` as `java.io.tmpdir` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-22 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879991#comment-15879991
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske], 

You raised a very good question, which is essentially "what's the difference 
between the filter pushdown and partition pruning".

You are right about partition pruning is actually a coarse-grained filter 
push-down, but more importantly, we can view it as a more "static" or more 
"predictable" filter. Here is an example to describe more explicitly. Suppose 
Flink supports parquet table source,  and since parquet files contains some 
RowGroup level statistics such as max/min value, we can use these information 
to reduce the data we need to read. But before we do anything, we need to make 
sure whether the source files contain such information or not. So we need to 
read all the metas from these files to do some check work. If we are facing 
thousands of the files, it will be really costly. 

However, the partition is something more static and predictable. Like if all 
your source files are organized by some time based directory like 
/mm/dd/1.file and we actually have some partition fields to describe the 
time information. It will be more efficient and easy to do the partition level 
filter first. 

But this doesn't mean we should have another trait like 
{{PartitionableTableSource}}, either extending the under reviewing 
{{FilterableTableSource}} or provide another explicitly 
{{PartitionableTableSource}} is fine with me. But we should at least make 
"partition pruning" seeable from the users who may write their own 
{{TableSource}}, instead of let all the magics happen under one method right 
now, which will be 
{code}def setPredicate(predicate: Array[Expression]): Array[Expression]{code} 
in the current under reviewing version of {{FilterableTableSource}}. 

Let me know if you have some thoughts about this. 

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879990#comment-15879990
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102648005
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

Metricifing the ES connectors seems like a good idea, especially with its 
growing popularity. I'll think about it and file a JIRA with some initial 
proposals.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3358: [FLINK-5487] [elasticsearch] At-least-once Elastic...

2017-02-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102648005
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

Metricifing the ES connectors seems like a good idea, especially with its 
growing popularity. I'll think about it and file a JIRA with some initial 
proposals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879988#comment-15879988
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102647903
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

Regarding the frequency of `EsRejectedExecutionException`, from my 
experience with ES before, they pop up a lot with under-resourced / configured 
ES clusters.

It can flood logs if it isn't treated accordingly, but not logging them can 
be bad too because you'll know nothing about it, unless the sink eventually 
fails with it.

We could also remove the failure logging from the `ElasticsearchSinkBase` 
and let the user be responsible for that, but I'm a bit undecided here. Open to 
suggestions for this!


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3358: [FLINK-5487] [elasticsearch] At-least-once Elastic...

2017-02-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102647903
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

Regarding the frequency of `EsRejectedExecutionException`, from my 
experience with ES before, they pop up a lot with under-resourced / configured 
ES clusters.

It can flood logs if it isn't treated accordingly, but not logging them can 
be bad too because you'll know nothing about it, unless the sink eventually 
fails with it.

We could also remove the failure logging from the `ElasticsearchSinkBase` 
and let the user be responsible for that, but I'm a bit undecided here. Open to 
suggestions for this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3358: [FLINK-5487] [elasticsearch] At-least-once Elastic...

2017-02-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102647565
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

The `BulkProcessor` listener actually logs them as LOG.error before they 
are processed by the failure handler (line 171 and line 180). So, these 
failures are always logged regardless of whether the failure handler chooses to 
log them. Do you think that's ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879974#comment-15879974
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102647565
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

The `BulkProcessor` listener actually logs them as LOG.error before they 
are processed by the failure handler (line 171 and line 180). So, these 
failures are always logged regardless of whether the failure handler chooses to 
log them. Do you think that's ok?


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5728) FlinkKafkaProducer should flush on checkpoint by default

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879965#comment-15879965
 ] 

ASF GitHub Bot commented on FLINK-5728:
---

Github user tzulitai commented on the pull request:


https://github.com/apache/flink/commit/646490c4e93eca315e4bf41704f149390f8639cc#commitcomment-21009882
  
@StephanEwen Yes. We discussed that in 
https://issues.apache.org/jira/browse/FLINK-5728 with some other aspects 
regarding the `setFlushOnCheckpoint` and `setLogFailuresOnly` methods, so it 
was kept as a separate issue.


> FlinkKafkaProducer should flush on checkpoint by default
> 
>
> Key: FLINK-5728
> URL: https://issues.apache.org/jira/browse/FLINK-5728
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> As discussed in FLINK-5702, it might be a good idea to let the 
> FlinkKafkaProducer flush on checkpoints by default. Currently, it is disabled 
> by default.
> It's a very simple change, but we should think about whether or not we want 
> to break user behaviour, or have proper usage migration.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #:

2017-02-22 Thread tzulitai
Github user tzulitai commented on the pull request:


https://github.com/apache/flink/commit/646490c4e93eca315e4bf41704f149390f8639cc#commitcomment-21009882
  
@StephanEwen Yes. We discussed that in 
https://issues.apache.org/jira/browse/FLINK-5728 with some other aspects 
regarding the `setFlushOnCheckpoint` and `setLogFailuresOnly` methods, so it 
was kept as a separate issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879942#comment-15879942
 ] 

Kurt Young commented on FLINK-5568:
---

Hi [~fhueske],

I want to emphasize there exists a very big difference between this catalog 
design and existing catalog like HCatalog or Hive. The difference is how we 
answer the following question: "Does the table type which flink understands 
bind with the catalog?". Why we need to answer this is because some catalog's 
implementation has hidden the table details for us. Like HCatalog, it provides 
a unified loader and storer layer to let you read and write all kinds of files, 
like Parquet, ORC and so on. So, if there exists a table from HCatalog which 
consists of parquet files, should we treat it as a "HCatalogTableSource", or 
just "ParquetTableSource". 

We think it's not a very good idea to have Flink's table bound with each 
catalog, so we introduce another converting layer to transfer the table type. 
This is actually a more flexible way to do since we can easily add 
"HCatalogTableSource" to handle all the tables from HCatalog. And it's easier 
for advanced users to implement their own catalog, the just need to tell Flink 
what's type of the table, some necessary informations as well as a pluggable 
"TableSource" implementation. 

Let us know if you have some other thoughts. 

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> |  identifier  | TableIdentifier | dbName and tableName 
> of table |
> |  tableType | String | type of external catalog table, 
> e.g csv, hbase, kafka
> |  schema| DataSchema|  schema of table data, 
> including column names and column types
> | partitionColumnNames | List | names of partition column
> | properties  | Map |properties of 
> external catalog table
> | stats   | TableStats | statistics of external 
> catalog table 
> | comment | String | 
> | create time | long
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
> fieldDelim, rowDelim and so on to create a new instance , 
> {{KafkaTableSource}} needs configuration and tableName to create a new 
> instance. So it's not a good idea to let Flink framework be responsible for 
> translate  {{ExternalCatalogTable}} to different kind of 
> {{TableSourceTable}}. 
> Here is on

[jira] [Commented] (FLINK-5879) ExecutionAttemptID should invoke super() in constructor

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879940#comment-15879940
 ] 

ASF GitHub Bot commented on FLINK-5879:
---

GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3396

[FLINK-5879] Fix bug about ExecutionAttemptID, add super() in constructor.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5879

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3396.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3396






> ExecutionAttemptID should invoke super() in constructor
> ---
>
> Key: FLINK-5879
> URL: https://issues.apache.org/jira/browse/FLINK-5879
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> Currently we forget invoking super() in constructor of ExecutionAttemptID. 
> This may cause NullPointerException.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3396: [FLINK-5879] Fix bug about ExecutionAttemptID, add...

2017-02-22 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3396

[FLINK-5879] Fix bug about ExecutionAttemptID, add super() in constructor.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5879

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3396.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3396






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5861) TaskManager's components support updating JobManagerConnection

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879923#comment-15879923
 ] 

ASF GitHub Bot commented on FLINK-5861:
---

GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3395

[FLINK-5861] Components of TaskManager support updating JobManagerConnection



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3395






> TaskManager's components support updating JobManagerConnection
> --
>
> Key: FLINK-5861
> URL: https://issues.apache.org/jira/browse/FLINK-5861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Biao Liu
>Assignee: Biao Liu
>
> Some components in TaskManager, such as TaskManagerActions, 
> CheckpointResponder, ResultPartitionConsumableNotifier, 
> PartitionProducerStateChecker, need to support updating JobManagerConnection. 
> So when JobManager fails and recovers, the tasks who keep old 
> JobManagerConnection can be notified to update JobManagerConnection. The 
> tasks can continue doing their jobs without failure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

2017-02-22 Thread ifndef-SleePy
GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3395

[FLINK-5861] Components of TaskManager support updating JobManagerConnection



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3395






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5891) ConnectedComponents is broken when object reuse enabled

2017-02-22 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879742#comment-15879742
 ] 

Xingcan Cui edited comment on FLINK-5891 at 2/23/17 5:38 AM:
-

Hi Greg, may I ask what does "when object reuse enabled" mean here? Is it 
necessary to make a copy of the attribute min before storing it?

I got it here. :) 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions]
 Is it safe to store the value wrapped in a message?


was (Author: xccui):
Hi Greg, may I ask what does "when object reuse enabled" mean here? Is it 
necessary to make a copy of the attribute min before storing it?

I got it here. :) 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions]

> ConnectedComponents is broken when object reuse enabled
> ---
>
> Key: FLINK-5891
> URL: https://issues.apache.org/jira/browse/FLINK-5891
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} 
> is storing a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
>   public static final class CCUpdater>
>   extends GatherFunction {
>   @Override
>   public void updateVertex(Vertex vertex, 
> MessageIterator messages) throws Exception {
>   VV current = vertex.getValue();
>   VV min = current;
>   for (VV msg : messages) {
>   if (msg.compareTo(min) < 0) {
>   min = msg;
>   }
>   }
>   if (!min.equals(current)) {
>   setNewVertexValue(min);
>   }
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5891) ConnectedComponents is broken when object reuse enabled

2017-02-22 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879742#comment-15879742
 ] 

Xingcan Cui edited comment on FLINK-5891 at 2/23/17 5:25 AM:
-

Hi Greg, may I ask what does "when object reuse enabled" mean here? Is it 
necessary to make a copy of the attribute min before storing it?

I got it here. :) 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions]


was (Author: xccui):
Hi Greg, may I ask what does "when object reuse enabled" mean here? Is it 
necessary to make a copy of the attribute min before storing it?

> ConnectedComponents is broken when object reuse enabled
> ---
>
> Key: FLINK-5891
> URL: https://issues.apache.org/jira/browse/FLINK-5891
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} 
> is storing a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
>   public static final class CCUpdater>
>   extends GatherFunction {
>   @Override
>   public void updateVertex(Vertex vertex, 
> MessageIterator messages) throws Exception {
>   VV current = vertex.getValue();
>   VV min = current;
>   for (VV msg : messages) {
>   if (msg.compareTo(min) < 0) {
>   min = msg;
>   }
>   }
>   if (!min.equals(current)) {
>   setNewVertexValue(min);
>   }
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879871#comment-15879871
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102634122
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
--- End diff --

Can you add some non-agg fields? That can show the different with 
groupWindow.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> P

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879873#comment-15879873
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102632917
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowRule.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.logical.{LogicalProject, LogicalWindow}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.plan.logical.rel.LogicalOverWindow
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to check whether rowtime or proctime.
+  */
+class LogicalWindowRule
--- End diff --

Have you refer to FLINK-5884 and reference FLINK-5884'design to 
consider?(https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tgF1ueOAsFiQwahR72vbc/edit)
  Or we can wait for FLINK-5884 implementation, then rebase the code?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879879#comment-15879879
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102634209
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
--- End diff --

Can you remove this config? Event-time should handle this situation very 
well, because the data carries the time. What do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879870#comment-15879870
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102635310
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowRule.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, 
DataStreamSlideEventTimeRowAgg}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.plan.logical.rel.LogicalOverWindow
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to convert a LogicalOverWindow into a 
DataStreamSlideEventTimeRowAgg.
+  */
+class DataStreamWindowRule
+  extends ConverterRule(
+classOf[LogicalOverWindow],
+Convention.NONE,
+DataStreamConvention.INSTANCE,
+"DataStreamWindowRule")
+{
+
+  override def convert(rel: RelNode): RelNode = {
+val agg: LogicalOverWindow = rel.asInstanceOf[LogicalOverWindow]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataStreamConvention.INSTANCE)
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+if (agg.groups.size > 1) {
+  for (i <- 0 until agg.groups.size - 1)
+if (agg.groups(i).toString != agg.groups(i + 1).toString) {
+  throw new UnsupportedOperationException(
+"Unsupport different window in the same projection")
+}
+}
+
+val win = agg.groups(0)
+val rowtime = agg.isEventTime
+
+// map agg func with project name
+var aggIdx = inputRowType.getFieldCount - 1
+val outputNames = agg.getRowType.getFieldNames
+val namedAgg =
+  for (i <- 0 until agg.groups.size; aggCalls = 
agg.groups(i).getAggregateCalls(agg);
--- End diff --

IMHO. Maybe we can use a layer of iteration, because we only have one of 
the same window.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not suppor

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102634209
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
--- End diff --

Can you remove this config? Event-time should handle this situation very 
well, because the data carries the time. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879874#comment-15879874
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102634927
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalEventTimeRowWindowAssigner.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that assigns all elements to the same global 
row window.
+ *
+ */
+@PublicEvolving
+public class GlobalEventTimeRowWindowAssigner extends 
WindowAssigner {
--- End diff --

In this way we can not handle the order of data. just like your test case 
`testSlideEventTimeUnboundWindowWithLater`


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879876#comment-15879876
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631831
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
+val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879878#comment-15879878
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631646
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
+val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879880#comment-15879880
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631440
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
--- End diff --

IMO. Over Window should partitionKeys better.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879877#comment-15879877
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102633283
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -395,7 +395,8 @@ object AggregateUtil {
 val reduceFunction = new IncrementalAggregateReduceFunction(
   aggregates,
   groupingOffsetMapping,
-  intermediateRowArity)
+  intermediateRowArity,
+  groupings.length)
--- End diff --

Can we distinguish between groupwWindow'grouping and overWindow'partition?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879872#comment-15879872
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102632122
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowRule.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, 
DataStreamSlideEventTimeRowAgg}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.plan.logical.rel.LogicalOverWindow
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to convert a LogicalOverWindow into a 
DataStreamSlideEventTimeRowAgg.
+  */
+class DataStreamWindowRule
+  extends ConverterRule(
+classOf[LogicalOverWindow],
+Convention.NONE,
+DataStreamConvention.INSTANCE,
+"DataStreamWindowRule")
+{
+
+  override def convert(rel: RelNode): RelNode = {
+val agg: LogicalOverWindow = rel.asInstanceOf[LogicalOverWindow]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataStreamConvention.INSTANCE)
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+if (agg.groups.size > 1) {
+  for (i <- 0 until agg.groups.size - 1)
--- End diff --

`agg.groups.size > 1 enough to determine whether there are many different 
windows, what do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879875#comment-15879875
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631105
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -52,9 +53,14 @@ class IncrementalAggregateReduceFunction(
 //   and directly merge value1 and value2.
 val accumulatorRow = new Row(intermediateRowArity)
 
-// copy all fields of value1 into accumulatorRow
-(0 until intermediateRowArity)
-.foreach(i => accumulatorRow.setField(i, value1.getField(i)))
+// copy non agg fields of value2 into accumulatorRow
+(0 until aggOffset)
+  .foreach(i => accumulatorRow.setField(i, value2.getField(i)))
+
+// copy agg fields of value1 into accumulatorRow
--- End diff --

This class will be shared groupwindow and overwindow, when groupwindow do 
not need to set the value of the non-agg, What do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879869#comment-15879869
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102630342
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
--- End diff --

Over Window should partitionKeys better.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> ---

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102635310
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowRule.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, 
DataStreamSlideEventTimeRowAgg}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.plan.logical.rel.LogicalOverWindow
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to convert a LogicalOverWindow into a 
DataStreamSlideEventTimeRowAgg.
+  */
+class DataStreamWindowRule
+  extends ConverterRule(
+classOf[LogicalOverWindow],
+Convention.NONE,
+DataStreamConvention.INSTANCE,
+"DataStreamWindowRule")
+{
+
+  override def convert(rel: RelNode): RelNode = {
+val agg: LogicalOverWindow = rel.asInstanceOf[LogicalOverWindow]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataStreamConvention.INSTANCE)
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+if (agg.groups.size > 1) {
+  for (i <- 0 until agg.groups.size - 1)
+if (agg.groups(i).toString != agg.groups(i + 1).toString) {
+  throw new UnsupportedOperationException(
+"Unsupport different window in the same projection")
+}
+}
+
+val win = agg.groups(0)
+val rowtime = agg.isEventTime
+
+// map agg func with project name
+var aggIdx = inputRowType.getFieldCount - 1
+val outputNames = agg.getRowType.getFieldNames
+val namedAgg =
+  for (i <- 0 until agg.groups.size; aggCalls = 
agg.groups(i).getAggregateCalls(agg);
--- End diff --

IMHO. Maybe we can use a layer of iteration, because we only have one of 
the same window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631646
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
+val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+val inputFields = new Array[Int](inputType.getFieldCount)
--- End diff --

  Can you consider this way?
` val inputFields =  (for(i <- 0 until inputType.getFieldCount) yield 
i).toArray`


---
If your project is set up 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102632122
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowRule.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, 
DataStreamSlideEventTimeRowAgg}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.plan.logical.rel.LogicalOverWindow
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to convert a LogicalOverWindow into a 
DataStreamSlideEventTimeRowAgg.
+  */
+class DataStreamWindowRule
+  extends ConverterRule(
+classOf[LogicalOverWindow],
+Convention.NONE,
+DataStreamConvention.INSTANCE,
+"DataStreamWindowRule")
+{
+
+  override def convert(rel: RelNode): RelNode = {
+val agg: LogicalOverWindow = rel.asInstanceOf[LogicalOverWindow]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataStreamConvention.INSTANCE)
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+if (agg.groups.size > 1) {
+  for (i <- 0 until agg.groups.size - 1)
--- End diff --

`agg.groups.size > 1 enough to determine whether there are many different 
windows, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631440
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
--- End diff --

IMO. Over Window should partitionKeys better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102630342
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
--- End diff --

Over Window should partitionKeys better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102632917
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowRule.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.logical.{LogicalProject, LogicalWindow}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.plan.logical.rel.LogicalOverWindow
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to check whether rowtime or proctime.
+  */
+class LogicalWindowRule
--- End diff --

Have you refer to FLINK-5884 and reference FLINK-5884'design to 
consider?(https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tgF1ueOAsFiQwahR72vbc/edit)
  Or we can wait for FLINK-5884 implementation, then rebase the code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631105
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -52,9 +53,14 @@ class IncrementalAggregateReduceFunction(
 //   and directly merge value1 and value2.
 val accumulatorRow = new Row(intermediateRowArity)
 
-// copy all fields of value1 into accumulatorRow
-(0 until intermediateRowArity)
-.foreach(i => accumulatorRow.setField(i, value1.getField(i)))
+// copy non agg fields of value2 into accumulatorRow
+(0 until aggOffset)
+  .foreach(i => accumulatorRow.setField(i, value2.getField(i)))
+
+// copy agg fields of value1 into accumulatorRow
--- End diff --

This class will be shared groupwindow and overwindow, when groupwindow do 
not need to set the value of the non-agg, What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102631831
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.CommonAggregate
+
+class DataStreamSlideEventTimeRowAgg(
+namedProperties: Seq[NamedWindowProperty],
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSlideEventTimeRowAgg(
+  namedProperties,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+s"Aggregate(${
+  if (!grouping.isEmpty) {
+s"groupBy: (${groupingToString(inputType, grouping)}), "
+  } else {
+""
+  }
+} orderBy: (eventtime), window: (unbounded), " +
+  s"select: (${
+aggregationToString(
+  inputType,
+  inputFields,
+  getRowType,
+  namedAggregates,
+  namedProperties)
+  }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("orderBy", "eventtime")
+  .item("window", "unbounded")
+  .item("select", aggregationToString(
+inputType,
+inputFields,
+getRowType,
+namedAggregates,
+namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+
+val config = tableEnv.getConfig
+val groupingKeys = grouping.toArray
+val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+val inputFields = new Array[Int](inputType.getFieldCount)
+for (i <- 0 until inputType.getFieldCount)
+  inputFields(i) = i
+
+val aggString = aggregationToString(
+  inputType,
+  inputField

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102634122
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -171,4 +175,98 @@ class SqlITCase extends 
StreamingMultipleProgramsTestBase {
 val expected = mutable.MutableList("Hello", "Hello world")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithoutGroup(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), StreamITCase.testResults.sorted.get(2))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def testSlideEventTimeUnboundWindowWithLater(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
--- End diff --

Can you add some non-agg fields? That can show the different with 
groupWindow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102634927
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalEventTimeRowWindowAssigner.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that assigns all elements to the same global 
row window.
+ *
+ */
+@PublicEvolving
+public class GlobalEventTimeRowWindowAssigner extends 
WindowAssigner {
--- End diff --

In this way we can not handle the order of data. just like your test case 
`testSlideEventTimeUnboundWindowWithLater`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-02-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r102633283
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -395,7 +395,8 @@ object AggregateUtil {
 val reduceFunction = new IncrementalAggregateReduceFunction(
   aggregates,
   groupingOffsetMapping,
-  intermediateRowArity)
+  intermediateRowArity,
+  groupings.length)
--- End diff --

Can we distinguish between groupwWindow'grouping and overWindow'partition?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Issue Comment Deleted] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-02-22 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Comment: was deleted

(was: Feel free to work on this.
Thanks, Liwei.)

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry> entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.

Here is the design mode of ExternalCatalogTable.

|  identifier  | TableIdentifier | dbName and tableName of 
table |
|  tableType | String | type of external catalog table, e.g 
csv, hbase, kafka
|  schema| DataSchema|  schema of table data, including 
column names and column types
| partitionColumnNames | List | names of partition column
| properties  | Map |properties of external 
catalog table
| stats   | TableStats | statistics of external 
catalog table 
| comment | String | 
| create time | long

There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is one solution. Let {{TableSource}} specify a converter.
1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
{{TableSource}} with the annotation means it is compatible with external 
catalog, that is, it could be converted to or from ExternalCatalogTable. This 
annotation specifies the tabletype and converter of the tableSource. For 
example, for {{CsvTableSource}}, it specifies the tableType is csv and 
converter class is CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch t

[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879786#comment-15879786
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102632219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length) {
+  paramToOperands = paramToOperands :+ (matchingSignature.head, 
operands(i))
--- End diff --

I will try to use `getComponentType()`


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102632219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length) {
+  paramToOperands = paramToOperands :+ (matchingSignature.head, 
operands(i))
--- End diff --

I will try to use `getComponentType()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879761#comment-15879761
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102626945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length) {
+  paramToOperands = paramToOperands :+ (matchingSignature.head, 
operands(i))
--- End diff --

Thanks, @wuchong .Yes. It's a mistake here. And tests haven't covered this 
situation. Since the max number of the arguments is 254. I don't think it is 
necessary to use a component type at the phase of code generation. I will try 
to add some tests to cover this situation.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879759#comment-15879759
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102627293
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
--- End diff --

Thanks @wuchong . I will do some tests and revisions to handle this.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102626945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length) {
+  paramToOperands = paramToOperands :+ (matchingSignature.head, 
operands(i))
--- End diff --

Thanks, @wuchong .Yes. It's a mistake here. And tests haven't covered this 
situation. Since the max number of the arguments is 254. I don't think it is 
necessary to use a component type at the phase of code generation. I will try 
to add some tests to cover this situation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628732
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 ---
@@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   }
   
   @Test
+  def testVariableArgs(): Unit = {
+testAllApis(
--- End diff --

Sure. Thanks @wuchong . As we discussed above, I will add more tests here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879760#comment-15879760
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628665
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
+  "variable args eg. scala.collection.Seq or Type*, please add a 
@varargs annotation " +
--- End diff --

Sure. Thanks @wuchong 


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879763#comment-15879763
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628732
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 ---
@@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   }
   
   @Test
+  def testVariableArgs(): Unit = {
+testAllApis(
--- End diff --

Sure. Thanks @wuchong . As we discussed above, I will add more tests here.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879762#comment-15879762
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628111
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
--- End diff --

If the users use the annotation `@scala.annotation.varargs`, Scala will 
generate two signatures of the method. One is `T eval(scala.collection.Seq 
args)`, the other is `T eval(T[] args)`. A better idea is to compare every 
arguments of the signature. We can make sure either there is only one method `T 
eval(T[] args)`, or there are two methods: `T eval(scala.collection.Seq 
args)` and `T eval(T[] args)`.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628111
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
--- End diff --

If the users use the annotation `@scala.annotation.varargs`, Scala will 
generate two signatures of the method. One is `T eval(scala.collection.Seq 
args)`, the other is `T eval(T[] args)`. A better idea is to compare every 
arguments of the signature. We can make sure either there is only one method `T 
eval(T[] args)`, or there are two methods: `T eval(scala.collection.Seq 
args)` and `T eval(T[] args)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879764#comment-15879764
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102627751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
+  curSig.zipWithIndex.forall { case (clazz, i) =>
+  parameterTypeEquals(actualSignature(i), clazz) ||
+(i == curSig.length - 1 && clazz.isArray)
--- End diff --

Thanks @wuchong. They are good suggestions.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102627293
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
--- End diff --

Thanks @wuchong . I will do some tests and revisions to handle this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102627751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
+  curSig.zipWithIndex.forall { case (clazz, i) =>
+  parameterTypeEquals(actualSignature(i), clazz) ||
+(i == curSig.length - 1 && clazz.isArray)
--- End diff --

Thanks @wuchong. They are good suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628665
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
+  "variable args eg. scala.collection.Seq or Type*, please add a 
@varargs annotation " +
--- End diff --

Sure. Thanks @wuchong 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879755#comment-15879755
 ] 

jingzhang commented on FLINK-5568:
--

[~fhueske], thanks for your response. There is still a detail problem need to 
discuss, that is, how to convert ExternalCatalogTable to TableSourceTable. I 
add the question and one solution in the description. Looking forward to your 
advices.

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
> fieldDelim, rowDelim and so on to create a new instance , 
> {{KafkaTableSource}} needs configuration and tableName to create a new 
> instance. So it's not a good idea to let Flink framework be responsible for 
> translate  {{ExternalCatalogTable}} to different kind of 
> {{TableSourceTable}}. 
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
> {{TableSource}} with the annotation means it is compatible with external 
> catalog, that is, it could be converted to or from ExternalCatalogTable. This 
> annotation specifies the tabletype and converter of the tableSource. For 
> example, for {{CsvTableSource}}, it specifies the tableType is csv and 
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter = 
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
> converter based on tableType. and let converter do convert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is one solution. Let {{TableSource}} specify a converter.
1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
{{TableSource}} with the annotation means it is compatible with external 
catalog, that is, it could be converted to or from ExternalCatalogTable. This 
annotation specifies the tabletype and converter of the tableSource. For 
example, for {{CsvTableSource}}, it specifies the tableType is csv and 
converter class is CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem 

[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is one solution. Let {{TableSource}} specify a converter.
1. provide  an Annatition named ExternalCatalogCompatible. The {{TableSource}} 
with the annotation means it is compatible with external catalog, that is, it 
could be converted to or from ExternalCatalogTable. This annotation specifies 
the tabletype and converter of the tableSource. For example, for 
{{CsvTableSource}}, it specifies the tableType is csv and converter class is 
CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem need

[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is my thought. Let {{TableSource}} specify a converter.
1. provide  an Annatition named ExternalCatalogCompatible. The {{TableSource}} 
with the annotation means it is compatible with external catalog, that is, it 
could be converted to or from ExternalCatalogTable. This annotation specifies 
the tabletype and converter of the tableSource. For example, for 
{{CsvTableSource}}, it specifies the tableType is csv and converter class is 
CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
TableSourceTable).
3. register external catalog to {{TableEnvironment}}.


> Introduce interface for catalog, and pro

[jira] [Commented] (FLINK-5891) ConnectedComponents is broken when object reuse enabled

2017-02-22 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879742#comment-15879742
 ] 

Xingcan Cui commented on FLINK-5891:


Hi Greg, may I ask what does "when object reuse enabled" mean here? Is it 
necessary to make a copy of the attribute min before storing it?

> ConnectedComponents is broken when object reuse enabled
> ---
>
> Key: FLINK-5891
> URL: https://issues.apache.org/jira/browse/FLINK-5891
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} 
> is storing a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
>   public static final class CCUpdater>
>   extends GatherFunction {
>   @Override
>   public void updateVertex(Vertex vertex, 
> MessageIterator messages) throws Exception {
>   VV current = vertex.getValue();
>   VV min = current;
>   for (VV msg : messages) {
>   if (msg.compareTo(min) < 0) {
>   min = msg;
>   }
>   }
>   if (!min.equals(current)) {
>   setNewVertexValue(min);
>   }
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5888) ForwardedFields annotation is not generating optimised execution plan in example KMeans job

2017-02-22 Thread Ziyad Muhammed Mohiyudheen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879605#comment-15879605
 ] 

Ziyad Muhammed Mohiyudheen commented on FLINK-5888:
---

[~fhueske], Here is the output for the cluster run:
https://drive.google.com/file/d/0B0IlZv0uHBuvUUlBUndld3BKc0E
The input data size is ~1.2G. The job finishes much faster when run without 
Annotations. Also, the statistics shows that huge amount of intermediate data 
is generated while run with Annotations enabled.


> ForwardedFields annotation is not generating optimised execution plan in 
> example KMeans job
> ---
>
> Key: FLINK-5888
> URL: https://issues.apache.org/jira/browse/FLINK-5888
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Examples, Java API
>Affects Versions: 1.1.3
>Reporter: Ziyad Muhammed Mohiyudheen
>
> Flink KMeans java example [1] shows the usage of ForwardedFields function 
> annotation. How ever, the example job was taking more time than expected on 
> medium sized data itself. By merely removing the function annotation from the 
> example code (with out any other change), a better execution plan and run 
> time was obtained. The execution plan shows that no combiner is used and the 
> two Map tasks are not chained when ForwardedFields is enabled. The experiment 
> is documented in [2]
> [1] 
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
> [2] https://drive.google.com/open?id=0B0IlZv0uHBuvVEZ5ZmNpN19jVVU



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3354: [FLINK-5767] [Table] New aggregate function interface and...

2017-02-22 Thread shaoxuan-wang
Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3354
  
@fhueske  thanks for the review. I agree with you that we should provide 
more specific types for the accumulators (backend state). I will update code 
today to address all your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879512#comment-15879512
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3354
  
@fhueske  thanks for the review. I agree with you that we should provide 
more specific types for the accumulators (backend state). I will update code 
today to address all your comments.


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879498#comment-15879498
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102606570
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/builtInAggFuncs/SumAggFunction.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.builtInAggFuncs
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+  * Base class for built-in Sum aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] {
+  /** The initial accumulator for Sum aggregate function */
+  class SumAccumulator[T] extends Accumulator {
+var sum: Option[T] = None
--- End diff --

This was cloning from current existing built-in functions. Did not think it 
more thoroughly. Yes, you are right. I agree with you that we do not need the 
Option here. 


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-22 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102606570
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/builtInAggFuncs/SumAggFunction.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.builtInAggFuncs
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+  * Base class for built-in Sum aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] {
+  /** The initial accumulator for Sum aggregate function */
+  class SumAccumulator[T] extends Accumulator {
+var sum: Option[T] = None
--- End diff --

This was cloning from current existing built-in functions. Did not think it 
more thoroughly. Yes, you are right. I agree with you that we do not need the 
Option here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879377#comment-15879377
 ] 

Fabian Hueske commented on FLINK-5568:
--

I think this is a good approach.

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support TableSourceTable).
> 3. register external catalog to {{TableEnvironment}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5888) ForwardedFields annotation is not generating optimised execution plan in example KMeans job

2017-02-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879297#comment-15879297
 ] 

Fabian Hueske commented on FLINK-5888:
--

Yes, [~ggevay] is right. The hash partitioning is moved between the two maps 
(which is semantically OK due to the annotations).
Without debugging the optimizer, I see two reasons why that happened:

1. Both plans have identical unknown estimated costs because no stats are 
available.
2. The plan without Combiner has lower estimated costs (the combiner is not 
injected before the Reduce, because the data is already partitioned).

One more thing. Adding a {{CombineHint.HASH}} to the {{CentroidAccumulator}} as 
follows: 
{code}
.groupBy(0).reduce(new 
CentroidAccumulator()).setCombineHint(ReduceOperatorBase.CombineHint.HASH)
{code}

might speed up things as well.

> ForwardedFields annotation is not generating optimised execution plan in 
> example KMeans job
> ---
>
> Key: FLINK-5888
> URL: https://issues.apache.org/jira/browse/FLINK-5888
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Examples, Java API
>Affects Versions: 1.1.3
>Reporter: Ziyad Muhammed Mohiyudheen
>
> Flink KMeans java example [1] shows the usage of ForwardedFields function 
> annotation. How ever, the example job was taking more time than expected on 
> medium sized data itself. By merely removing the function annotation from the 
> example code (with out any other change), a better execution plan and run 
> time was obtained. The execution plan shows that no combiner is used and the 
> two Map tasks are not chained when ForwardedFields is enabled. The experiment 
> is documented in [2]
> [1] 
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
> [2] https://drive.google.com/open?id=0B0IlZv0uHBuvVEZ5ZmNpN19jVVU



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5891) ConnectedComponents is broken when object reuse enabled

2017-02-22 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-5891:
-

 Summary: ConnectedComponents is broken when object reuse enabled
 Key: FLINK-5891
 URL: https://issues.apache.org/jira/browse/FLINK-5891
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Affects Versions: 1.3.0
Reporter: Greg Hogan


{{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} 
is storing a value from its iterator.

{{GSAConnectedComponents}} does not have this limitation.

{code}
public static final class CCUpdater>
extends GatherFunction {

@Override
public void updateVertex(Vertex vertex, 
MessageIterator messages) throws Exception {
VV current = vertex.getValue();
VV min = current;

for (VV msg : messages) {
if (msg.compareTo(min) < 0) {
min = msg;
}
}

if (!min.equals(current)) {
setNewVertexValue(min);
}
}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-02-22 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-5890:
-

 Summary: GatherSumApply broken when object reuse enabled
 Key: FLINK-5890
 URL: https://issues.apache.org/jira/browse/FLINK-5890
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Affects Versions: 1.3.0
Reporter: Greg Hogan
Assignee: Greg Hogan
 Fix For: 1.3.0


{{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in {{ReduceDriver}} 
for the returned results).

{code}
@Override
public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
Exception {
K key = arg0.f0;
M result = this.sumFunction.sum(arg0.f1, arg1.f1);
return new Tuple2<>(key, result);
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #:

2017-02-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/646490c4e93eca315e4bf41704f149390f8639cc#commitcomment-21003603
  
Should we make `flushOnCheckpoint` true by default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5889) Improving the Flink Python batch API test framework

2017-02-22 Thread Lior Amar (JIRA)
Lior Amar created FLINK-5889:


 Summary: Improving the Flink Python batch API test framework
 Key: FLINK-5889
 URL: https://issues.apache.org/jira/browse/FLINK-5889
 Project: Flink
  Issue Type: Improvement
  Components: Python API
Affects Versions: 1.3.0
Reporter: Lior Amar
Priority: Minor


Allowing the developer to write multiple tests in a single file using the 
python unittest framework. 
The Python Batch API allows only a single plan in a file, thus requiring a file 
for each case.

This issue propose the following:
* A way to gather multiple tests in a single python file, in a way similar to 
the python standard unit test method.
* A way to run tests from the command line via a shell script - will make 
running multiple tests on a Flink deployment easier.


You can take a look at https://github.com/lior-amar/flink.git the 
pm/lior/py_tests branch. 

Comments will be appreciated.





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879194#comment-15879194
 ] 

ASF GitHub Bot commented on FLINK-4422:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3384
  
Looks good, +1 to merge

Optional comment: I have seen that developers get confused when working 
with the code whether a `long` refers to a "millisecond" timestamp or to a 
"nanosecond" timestamp.

One way to solve that is to put that into the variable names (like 
`MAX_DELAY_MILLIS` or `deadlineNanos`). Another way is to use only milliseconds 
everywhere. Something like 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
 can help to get the milliseconds in a more readable way than using nanoTime 
and multiplying.


> Convert all time interval measurements to System.nanoTime()
> ---
>
> Key: FLINK-4422
> URL: https://issues.apache.org/jira/browse/FLINK-4422
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>Priority: Minor
>
> In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is 
> monotonous. To measure delays and time intervals, {{System.nanoTime()}} is 
> hence reliable, while {{System.currentTimeMillis()}} is not.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3384: [FLINK-4422] Convert all time interval measurements to Sy...

2017-02-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3384
  
Looks good, +1 to merge

Optional comment: I have seen that developers get confused when working 
with the code whether a `long` refers to a "millisecond" timestamp or to a 
"nanosecond" timestamp.

One way to solve that is to put that into the variable names (like 
`MAX_DELAY_MILLIS` or `deadlineNanos`). Another way is to use only milliseconds 
everywhere. Something like 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
 can help to get the milliseconds in a more readable way than using nanoTime 
and multiplying.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1526) Add Minimum Spanning Tree library method and example

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879128#comment-15879128
 ] 

ASF GitHub Bot commented on FLINK-1526:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3284
  
Hi @xccui, I've been looking at this and FLINK-1707 (Affinity Propagation) 
the last few days (had to learn the algorithm first). My first analysis for an 
algorithm is to look at the performance and FLINK-4949 will make this much 
simpler to execute and measure.

The ticket talks about for-loop iterations but it seems we really want 
nested iterations.


> Add Minimum Spanning Tree library method and example
> 
>
> Key: FLINK-1526
> URL: https://issues.apache.org/jira/browse/FLINK-1526
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Xingcan Cui
>
> This issue proposes the addition of a library method and an example for 
> distributed minimum spanning tree in Gelly.
> The DMST algorithm is very interesting because it is quite different from 
> PageRank-like iterative graph algorithms. It consists of distinct phases 
> inside the same iteration and requires a mechanism to detect convergence of 
> one phase to proceed to the next one. Current implementations in 
> vertex-centric models are quite long (>1000 lines) and hard to understand.
> You can find a description of the algorithm [here | 
> http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf] and [here | 
> http://www.vldb.org/pvldb/vol7/p1047-han.pdf].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   4   >