[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066022#comment-16066022
 ] 

ASF GitHub Bot commented on FLINK-6407:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4205
  
Please don't cc in on every pull request. Other people do reviews as well.


> Upgrade AVRO dependency version to 1.8.x
> 
>
> Key: FLINK-6407
> URL: https://issues.apache.org/jira/browse/FLINK-6407
> Project: Flink
>  Issue Type: Wish
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.1
>Reporter: Miguel
>Assignee: mingleizhang
>Priority: Minor
>
> Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is 
> limited to String type keys). It was solved in Avro 1.8.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4205: [FLINK-6407] [build] Upgrade AVRO to 1.8.2

2017-06-28 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4205
  
Please don't cc in on every pull request. Other people do reviews as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066021#comment-16066021
 ] 

Piotr Nowojski commented on FLINK-6996:
---

[~tzulitai] I guess we were both wrong. I added tests for this issue for both 
(a) and (b), and both were failing (not flushing the data) before fix.

> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4206

[FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-least-once semantic



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink at-least-once

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4206.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4206


commit b05b72a2baab8656787e2020120750e780b37621
Author: Piotr Nowojski 
Date:   2017-06-26T09:28:51Z

[FLINK-6996] Refactor and automaticall inherit KafkaProducer integration 
tests

commit 62b553503964230d8af6d7d79054721060da8061
Author: Piotr Nowojski 
Date:   2017-06-26T10:20:36Z

[FLINK-6996] Fix formatting in KafkaConsumerTestBase and 
KafkaProducerTestBase

commit 34ba4b74f0c5c6b915695ab8bf7bda5b40955d5b
Author: Piotr Nowojski 
Date:   2017-06-26T10:36:40Z

[FLINK-6996] Fix at-least-once semantic for FlinkKafkaProducer010

Add tests coverage for Kafka 0.10 and 0.9




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066023#comment-16066023
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4206

[FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-least-once semantic



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink at-least-once

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4206.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4206


commit b05b72a2baab8656787e2020120750e780b37621
Author: Piotr Nowojski 
Date:   2017-06-26T09:28:51Z

[FLINK-6996] Refactor and automaticall inherit KafkaProducer integration 
tests

commit 62b553503964230d8af6d7d79054721060da8061
Author: Piotr Nowojski 
Date:   2017-06-26T10:20:36Z

[FLINK-6996] Fix formatting in KafkaConsumerTestBase and 
KafkaProducerTestBase

commit 34ba4b74f0c5c6b915695ab8bf7bda5b40955d5b
Author: Piotr Nowojski 
Date:   2017-06-26T10:36:40Z

[FLINK-6996] Fix at-least-once semantic for FlinkKafkaProducer010

Add tests coverage for Kafka 0.10 and 0.9




> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414

2017-06-28 Thread Pengcheng Xiong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066039#comment-16066039
 ] 

Pengcheng Xiong commented on FLINK-6173:


May I ask if this issue is resolved? I am using the following configurations
{code}

  org.apache.flink
  flink-table_2.10
  1.3.0


  org.apache.flink
  flink-scala_2.10
  1.3.0


  org.apache.flink
  flink-streaming-scala_2.10
  1.3.0

{code}
And when i run "explain" in java, it throws similar error
{code}
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper
at 
org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32)
at 
org.apache.flink.table.api.StreamTableEnvironment.explain(StreamTableEnvironment.scala:699)
at myGID.flink.Streaming.main(Streaming.java:24)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 3 more

{code}
May I ask if there is any quick workaround for this issue? Thanks!

> flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
> 
>
> Key: FLINK-6173
> URL: https://issues.apache.org/jira/browse/FLINK-6173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhenghua Gao
>
> Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them 
> to org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> If a project depends on flink-table, and uses fasterxml as follows(function 
> explain uses fasterxml indirectly):
> {code:title=WordCount.scala|borderStyle=solid}
> object WordCountWithTable {
>   def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> val expr = input.toTable(tEnv)
> val result = expr
>   .groupBy('word)
>   .select('word, 'frequency.sum as 'frequency)
>   .filter('frequency === 2)
> println(tEnv.explain(result))
> result.toDataSet[WC].print()
>   }
>   case class WC(word: String, frequency: Long)
> }
> {code}
> It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> I found after FLINK-5414,  flink-table didn't pack-in com.fasterxml.jackson.* 
> and the project would throw class not found exception.
> {code:borderStyle=solid}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper
>   at 
> org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164)
>   at 
> org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34)
>   at 
> org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 10 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124466523
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -411,6 +414,18 @@ public void processElement(StreamRecord element) 
throws Exception {
invokeInternal(element.getValue(), element.getTimestamp());
}
 
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   final FlinkKafkaProducerBase internalProducer = 
(FlinkKafkaProducerBase) userFunction;
+   internalProducer.snapshotState(context);
+   }
+
+   @Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
--- End diff --

nit: I would declare `initializeState` before `snapshotState`, just for the 
sake of a better logic flow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467315
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 ---
@@ -18,17 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer08}.
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+   // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
--- End diff --

If this a pending fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467541
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 ---
@@ -18,17 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer08}.
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+   // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
}
 
+   @Override
+   public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+   // Disable this test since FlinkKafka08Producer doesn't support 
writing timestamps
--- End diff --

I would perhaps rephrase this comment a bit:
it's disabled because FlinkKafka08Producer doesn't run in the custom 
operator mode (to be coherent with the test case name)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124468424
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -116,6 +120,30 @@ public String getVersion() {
}
 
@Override
+   public  Collection> 
getAllRecordsFromTopic(Properties properties, String topic, int partition) {
+   ImmutableList.Builder> result = 
ImmutableList.builder();
+   KafkaConsumer consumer = new KafkaConsumer<>(properties);
+   consumer.assign(ImmutableList.of(new TopicPartition(topic, 
partition)));
+
+   while (true) {
+   boolean processedAtLeastOneRecord = false;
--- End diff --

I'm a bit confused by this flag.
The method name is `getAllRecordsFromTopic`, but it seems like we're 
escaping the loop once some record is fetched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467929
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -28,6 +28,7 @@
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 
+import com.google.common.collect.ImmutableList;
--- End diff --

Avoid Guava


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467740
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 ---
@@ -18,17 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer09}.
  */
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+   // Disable this test since FlinkKafka09Producer doesn't support 
writing timestamps
--- End diff --

Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066050#comment-16066050
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467315
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 ---
@@ -18,17 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer08}.
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+   // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
--- End diff --

If this a pending fix?


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066051#comment-16066051
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467740
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 ---
@@ -18,17 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer09}.
  */
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+   // Disable this test since FlinkKafka09Producer doesn't support 
writing timestamps
--- End diff --

Same here.


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066048#comment-16066048
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467541
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 ---
@@ -18,17 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer08}.
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+   // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
}
 
+   @Override
+   public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+   // Disable this test since FlinkKafka08Producer doesn't support 
writing timestamps
--- End diff --

I would perhaps rephrase this comment a bit:
it's disabled because FlinkKafka08Producer doesn't run in the custom 
operator mode (to be coherent with the test case name)


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066047#comment-16066047
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124466733
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -26,6 +26,7 @@
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 
+import com.google.common.collect.ImmutableList;
--- End diff --

In Flink we usually try to avoid Guava usages. Would it be easy to switch 
to `Collections.unmodifiableList`?


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124466733
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -26,6 +26,7 @@
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 
+import com.google.common.collect.ImmutableList;
--- End diff --

In Flink we usually try to avoid Guava usages. Would it be easy to switch 
to `Collections.unmodifiableList`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066052#comment-16066052
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124467929
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -28,6 +28,7 @@
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 
+import com.google.common.collect.ImmutableList;
--- End diff --

Avoid Guava


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066046#comment-16066046
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124468424
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -116,6 +120,30 @@ public String getVersion() {
}
 
@Override
+   public  Collection> 
getAllRecordsFromTopic(Properties properties, String topic, int partition) {
+   ImmutableList.Builder> result = 
ImmutableList.builder();
+   KafkaConsumer consumer = new KafkaConsumer<>(properties);
+   consumer.assign(ImmutableList.of(new TopicPartition(topic, 
partition)));
+
+   while (true) {
+   boolean processedAtLeastOneRecord = false;
--- End diff --

I'm a bit confused by this flag.
The method name is `getAllRecordsFromTopic`, but it seems like we're 
escaping the loop once some record is fetched.


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066049#comment-16066049
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124466523
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -411,6 +414,18 @@ public void processElement(StreamRecord element) 
throws Exception {
invokeInternal(element.getValue(), element.getTimestamp());
}
 
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   final FlinkKafkaProducerBase internalProducer = 
(FlinkKafkaProducerBase) userFunction;
+   internalProducer.snapshotState(context);
+   }
+
+   @Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
--- End diff --

nit: I would declare `initializeState` before `snapshotState`, just for the 
sake of a better logic flow.


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4205: [FLINK-6407] [build] Upgrade AVRO to 1.8.2

2017-06-28 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4205
  
Okay. I remember.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066069#comment-16066069
 ] 

ASF GitHub Bot commented on FLINK-6407:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4205
  
Okay. I remember.


> Upgrade AVRO dependency version to 1.8.x
> 
>
> Key: FLINK-6407
> URL: https://issues.apache.org/jira/browse/FLINK-6407
> Project: Flink
>  Issue Type: Wish
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.1
>Reporter: Miguel
>Assignee: mingleizhang
>Priority: Minor
>
> Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is 
> limited to String type keys). It was solved in Avro 1.8.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4200: [FLINK-7014] Expose isDeterministic interface to ScalarFu...

2017-06-28 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4200
  
Thanks for updating and rebasing, the changes look good to me now.

+1 to merge



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066071#comment-16066071
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4200
  
Thanks for updating and rebasing, the changes look good to me now.

+1 to merge



> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3942: FLINK-6379 Mesos ResourceManager (FLIP-6)

2017-06-28 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3942
  
Really good work @EronWright. I've completed the review and rebased it on 
the latest master. While doing the review I applied one fix for a race 
condition in the tests, another fix to address the newly introduced checkstyle 
rules and one small change which removes the `MesosResourceManager's` internal 
RPC calls from the public interface, because they should only be needed by the 
RM's internal components. All small things. I really liked how you wrote the 
tests :-)

Once Travis gives green light, I will merge your PR.

As a follow up, I noticed two things:
1. Proper shutdown of internal components
2. Getting rid (at some point) of Akka dependency


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6379) Implement FLIP-6 Mesos Resource Manager

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066083#comment-16066083
 ] 

ASF GitHub Bot commented on FLINK-6379:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3942
  
Really good work @EronWright. I've completed the review and rebased it on 
the latest master. While doing the review I applied one fix for a race 
condition in the tests, another fix to address the newly introduced checkstyle 
rules and one small change which removes the `MesosResourceManager's` internal 
RPC calls from the public interface, because they should only be needed by the 
RM's internal components. All small things. I really liked how you wrote the 
tests :-)

Once Travis gives green light, I will merge your PR.

As a follow up, I noticed two things:
1. Proper shutdown of internal components
2. Getting rid (at some point) of Akka dependency


> Implement FLIP-6 Mesos Resource Manager
> ---
>
> Key: FLINK-6379
> URL: https://issues.apache.org/jira/browse/FLINK-6379
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> Given the new ResourceManager of FLIP-6, implement a new 
> MesosResourceManager.   
> The minimal effort would be to implement a new resource manager while 
> continuing to use the various local actors (launch coordinator, task monitor, 
> etc.) which implement the various FSMs associated with Mesos scheduling. 
> The Fenzo library would continue to solve the packing problem of matching 
> resource offers to slot requests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7025) Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over

2017-06-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066101#comment-16066101
 ] 

Fabian Hueske commented on FLINK-7025:
--

I agree. IMO, we should go for the keyed state (NullByteKeySelector) approach. 
This will also be more consistent with the other Over window operators.

> Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over
> 
>
> Key: FLINK-7025
> URL: https://issues.apache.org/jira/browse/FLINK-7025
> Project: Flink
>  Issue Type: Bug
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently we added `Cleanup State` feature. But It not work well if we 
> enabled the stateCleaning on Unbounded ProcTime NonPartitioned Over window, 
> Because in `ProcessFunctionWithCleanupState` we has using the keyed state.
> So, In this JIRA. I'll change the  `Unbounded ProcTime NonPartitioned Over` 
> to `partitioned Over` by using NullByteKeySelector. OR created a 
> `NonKeyedProcessFunctionWithCleanupState`. But I think the first way is 
> simpler. What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124476235
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -116,6 +120,30 @@ public String getVersion() {
}
 
@Override
+   public  Collection> 
getAllRecordsFromTopic(Properties properties, String topic, int partition) {
+   ImmutableList.Builder> result = 
ImmutableList.builder();
+   KafkaConsumer consumer = new KafkaConsumer<>(properties);
+   consumer.assign(ImmutableList.of(new TopicPartition(topic, 
partition)));
+
+   while (true) {
+   boolean processedAtLeastOneRecord = false;
--- End diff --

No, it's other way around. We are braking the loop if after pooling for 1 
second for next records we did get an empty response. Added comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066103#comment-16066103
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124476235
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -116,6 +120,30 @@ public String getVersion() {
}
 
@Override
+   public  Collection> 
getAllRecordsFromTopic(Properties properties, String topic, int partition) {
+   ImmutableList.Builder> result = 
ImmutableList.builder();
+   KafkaConsumer consumer = new KafkaConsumer<>(properties);
+   consumer.assign(ImmutableList.of(new TopicPartition(topic, 
partition)));
+
+   while (true) {
+   boolean processedAtLeastOneRecord = false;
--- End diff --

No, it's other way around. We are braking the loop if after pooling for 1 
second for next records we did get an empty response. Added comment.


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

2017-06-28 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124476992
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 ---
@@ -18,17 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer08}.
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+   // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
--- End diff --

I will not fix this test (I'm pretty sure this is a test issue) within the 
scope of this ticket. I even think that it's not worth the effort to 
investigate it at all - it is difficult to debug those failure tests and Kafka 
0.8 is pretty old.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066110#comment-16066110
 ] 

ASF GitHub Bot commented on FLINK-6996:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4206#discussion_r124476992
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 ---
@@ -18,17 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer08}.
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-   @Test
-   public void testCustomPartitioning() {
-   runCustomPartitioningTest();
+   @Override
+   public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+   // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
--- End diff --

I will not fix this test (I'm pretty sure this is a test issue) within the 
scope of this ticket. I even think that it's not worth the effort to 
investigate it at all - it is difficult to debug those failure tests and Kafka 
0.8 is pretty old.


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4183: [FLINK-6969][table]Add support for deferred computation f...

2017-06-28 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @wuchong, that's an interesting idea but I think it has the drawback 
that it might add latency. 

A timestamp extractor is only called with records and doesn't see 
watermarks. Since an operator might emit multiple records for the same 
timestamp, a timestamp extractor would always have to emit a watermark of last 
timestamp - 1 (we can be sure that the records are emitted in timestamp order) 
because it does not know which record is the last for a timestamp. So, we would 
add a latency of one window length (until the next window is processed).

Custom operators are a low level interface but it shouldn't be too hard to 
implement one that holds watermarks back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066115#comment-16066115
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @wuchong, that's an interesting idea but I think it has the drawback 
that it might add latency. 

A timestamp extractor is only called with records and doesn't see 
watermarks. Since an operator might emit multiple records for the same 
timestamp, a timestamp extractor would always have to emit a watermark of last 
timestamp - 1 (we can be sure that the records are emitted in timestamp order) 
because it does not know which record is the last for a timestamp. So, we would 
add a latency of one window length (until the next window is processed).

Custom operators are a low level interface but it shouldn't be too hard to 
implement one that holds watermarks back.


> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7013) Add shaded netty dependency

2017-06-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7013.
---
Resolution: Fixed

Fixed in a78b48a721b595eec56c94567ae52c2879ee1a2e

> Add shaded netty dependency
> ---
>
> Key: FLINK-7013
> URL: https://issues.apache.org/jira/browse/FLINK-7013
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7007) Add README to "flink-shaded.git" repository

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066167#comment-16066167
 ] 

ASF GitHub Bot commented on FLINK-7007:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink-shaded/pull/3


> Add README to "flink-shaded.git" repository
> ---
>
> Key: FLINK-7007
> URL: https://issues.apache.org/jira/browse/FLINK-7007
> Project: Flink
>  Issue Type: Sub-task
>  Components: flink-shaded.git
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> We should put a README file up there with a brief explanation of the purpose 
> of the repo + some links to the project.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7007) Add README to "flink-shaded.git" repository

2017-06-28 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7007.
---
Resolution: Fixed

Fixed in 236b7430d2b302e5f6785b210109c3978d11a690

> Add README to "flink-shaded.git" repository
> ---
>
> Key: FLINK-7007
> URL: https://issues.apache.org/jira/browse/FLINK-7007
> Project: Flink
>  Issue Type: Sub-task
>  Components: flink-shaded.git
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> We should put a README file up there with a brief explanation of the purpose 
> of the repo + some links to the project.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-06-28 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124489316
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink
+import 
org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.watermark.Watermark
+import scala.collection.mutable
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+
+class SortITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testEventTimeOrderBy(): Unit = {
+val data = Seq(
+  Left((1500L, (1L, 15, "Hello"))),
+  Left((1600L, (1L, 16, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((2000L, (2L, 2, "Hello"))),
+  Right(1000L),
+  Left((2000L, (2L, 2, "Hello"))),
+  Left((2000L, (2L, 3, "Hello"))),
+  Left((3000L, (3L, 3, "Hello"))),
+  Left((2000L, (3L, 1, "Hello"))),
+  Right(2000L),
+  Left((4000L, (4L, 4, "Hello"))),
+  Right(3000L),
+  Left((5000L, (5L, 5, "Hello"))),
+  Right(5000L),
+  Left((6000L, (6L, 65, "Hello"))),
+  Left((6000L, (6L, 6, "Hello"))),
+  Left((6000L, (6L, 67, "Hello"))),
+  Left((6000L, (6L, -1, "Hello"))),
+  Left((6000L, (6L, 6, "Hello"))),
+  Right(7000L),
+  Left((9000L, (6L, 9, "Hello"))),
+  Left((8500L, (6L, 18, "Hello"))),
+  Left((9000L, (6L, 7, "Hello"))),
+  Right(1L),
+  Left((1L, (7L, 7, "Hello World"))),
+  Left((11000L, (7L, 77, "Hello World"))),
+  Left((11000L, (7L, 17, "Hello World"))),
+  Right(12000L),
+  Left((14000L, (7L, 18, "Hello World"))),
+  Right(14000L),
+  Left((15000L, (8L, 8, "Hello World"))),
+  Right(17000L),
+  Left((2L, (20L, 20, "Hello World"))), 
+  Right(19000L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, 
String)](data))
+  .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+  
+tEnv.registerTable("T1", t1)
+
+val  sqlQuery = "SELECT b FROM T1 " +
+  "ORDER BY rowtime, b ASC ";
+  
+  
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
--- End diff --

In the flink version from where i started it it is not available. I think 
if i rebase now i will have to create a new PR... I would prefer to keep this 
and do an update after the merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
en

[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066176#comment-16066176
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124489316
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink
+import 
org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.watermark.Watermark
+import scala.collection.mutable
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+
+class SortITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testEventTimeOrderBy(): Unit = {
+val data = Seq(
+  Left((1500L, (1L, 15, "Hello"))),
+  Left((1600L, (1L, 16, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((2000L, (2L, 2, "Hello"))),
+  Right(1000L),
+  Left((2000L, (2L, 2, "Hello"))),
+  Left((2000L, (2L, 3, "Hello"))),
+  Left((3000L, (3L, 3, "Hello"))),
+  Left((2000L, (3L, 1, "Hello"))),
+  Right(2000L),
+  Left((4000L, (4L, 4, "Hello"))),
+  Right(3000L),
+  Left((5000L, (5L, 5, "Hello"))),
+  Right(5000L),
+  Left((6000L, (6L, 65, "Hello"))),
+  Left((6000L, (6L, 6, "Hello"))),
+  Left((6000L, (6L, 67, "Hello"))),
+  Left((6000L, (6L, -1, "Hello"))),
+  Left((6000L, (6L, 6, "Hello"))),
+  Right(7000L),
+  Left((9000L, (6L, 9, "Hello"))),
+  Left((8500L, (6L, 18, "Hello"))),
+  Left((9000L, (6L, 7, "Hello"))),
+  Right(1L),
+  Left((1L, (7L, 7, "Hello World"))),
+  Left((11000L, (7L, 77, "Hello World"))),
+  Left((11000L, (7L, 17, "Hello World"))),
+  Right(12000L),
+  Left((14000L, (7L, 18, "Hello World"))),
+  Right(14000L),
+  Left((15000L, (8L, 8, "Hello World"))),
+  Right(17000L),
+  Left((2L, (20L, 20, "Hello World"))), 
+  Right(19000L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, 
String)](data))
+  .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+  
+tEnv.registerTable("T1", t1)
+
+val  sqlQuery = "SELECT b FROM T1 " +
+  "ORDER BY rowtime, b ASC ";
+  
+  
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
--- End diff --

In the flink version from where i started it it is not available. I think 
if i rebase now 

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

2017-06-28 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4187
  
I don't think it matters to much whether a metric is completely measured by 
Flink or just forwarded from kafka classes. Having "kafka" in the name also 
introduces an inherent redundancy, since the scope for, say the 
KafkaConsumerThread, already contains "KafkaConsumer".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066180#comment-16066180
 ] 

ASF GitHub Bot commented on FLINK-6998:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4187
  
I don't think it matters to much whether a metric is completely measured by 
Flink or just forwarded from kafka classes. Having "kafka" in the name also 
introduces an inherent redundancy, since the scope for, say the 
KafkaConsumerThread, already contains "KafkaConsumer".


> Kafka connector needs to expose metrics for failed/successful offset commits 
> in the Kafka Consumer callback
> ---
>
> Key: FLINK-6998
> URL: https://issues.apache.org/jira/browse/FLINK-6998
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
>
> Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in 
> KafkaConsumerThread class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

2017-06-28 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4172
  
@dianfu So this is a plan that can work to avoid having to generate the 
code of the `IterativeCondition` every time. This came also after discussion 
with @fhueske who also explained me how things are done in the SQL part. So 
here we go:

1) You will start with a String which contains the user's query. 
2) You analyze the query and generate the code that corresponds to the 
`IterativeCondition`.
2i) The code goes into a `Wrapper` class, which extends the 
`IterativeCondition` and keeps only 2 `String`s: the code of the condition and 
a unique name for the condition.
2ii) This `Wrapper`s is passed to the `Pattern` and the `CEPOperator`.
**THE JOB IS SUBMITTED**
3) At the `open()`, the `CEPOperator` checks the states if they have actual 
`IterativeConditions` or `Wrapper` (which is a subclass of the 
`IterativeCondition`), and if yes:
3i) it gets the name and the code and compiles the code to an actual 
`IterativeCondition`, which is then stored instead of the `Wrapper`.

This way we have to compile only once, at the `open()` of the operator, and 
not every time.
What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6983) Do not serialize States with NFA

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066183#comment-16066183
 ] 

ASF GitHub Bot commented on FLINK-6983:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4172
  
@dianfu So this is a plan that can work to avoid having to generate the 
code of the `IterativeCondition` every time. This came also after discussion 
with @fhueske who also explained me how things are done in the SQL part. So 
here we go:

1) You will start with a String which contains the user's query. 
2) You analyze the query and generate the code that corresponds to the 
`IterativeCondition`.
2i) The code goes into a `Wrapper` class, which extends the 
`IterativeCondition` and keeps only 2 `String`s: the code of the condition and 
a unique name for the condition.
2ii) This `Wrapper`s is passed to the `Pattern` and the `CEPOperator`.
**THE JOB IS SUBMITTED**
3) At the `open()`, the `CEPOperator` checks the states if they have actual 
`IterativeConditions` or `Wrapper` (which is a subclass of the 
`IterativeCondition`), and if yes:
3i) it gets the name and the code and compiles the code to an actual 
`IterativeCondition`, which is then stored instead of the `Wrapper`.

This way we have to compile only once, at the `open()` of the operator, and 
not every time.
What do you think?



> Do not serialize States with NFA
> 
>
> Key: FLINK-6983
> URL: https://issues.apache.org/jira/browse/FLINK-6983
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dian Fu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4183: [FLINK-6969][table]Add support for deferred computation f...

2017-06-28 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @fhueske @wuchong Thanks for your reviewing and comments. Thanks!

1. For the param rename I am not sure whether it can be shared with `early 
fire` feature  or not. I suggest using current name, and we can change it if we 
need when we dev the `early fire` feature. And feel free to rename it in this 
PR. if you and @wuchong Insist on to renaming, I am fine about that. :)

2. About timestamp and watermark:
- Timestamp: 
I think we can emit records with the correct timestamps(late than 
watermark,but corresponds to window time ), I thinks the code 
`timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp())` of  
`WindowOperator#emitWindowContents` can guarantee that logic. That's meant it 
late than watermark,but is correct.

- Watermark:
  In current flink framework,GroupWindow and OverWindow related the 
`Watermark`. So If i understand you correctly, you worry about GroupWindow 
followed by a GroupWindow or  a OverWindow. Let's see follows:
 - For followed by GroupWindow case:
  As we know  `deferredComputationTime` is global configuration, i.e. 
In one job all GroupWindow will using the same 
TRIGGER(`DeferredComputationTrigger`), that only fires when current watermark 
not smaller than ` (window.maxTimestamp + 
queryConfig.getDeferredComputationTime)`. The records will be late emitted by 
the Level N window, So dose the Level N+1 window. The late always is 
`getDeferredComputationTime` time. i.e., This approach adds latency but can 
reduce the number of update esp.(that we wanted)

 - For followed by OverWindow case:
   I think this approach works well for row-time OverWindow, because 
Over Clause using timestamp value range the window. I think it works well If we 
emit correct timestamp for records.(And we did it by 
`timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp())` of  
`WindowOperator#emitWindowContents`) 

 - For select, filter...etc. I think also work well (adds latency).

3.  About `Trigger+ AssignerWithPunctuatedWatermarks` (@wuchong comment 
above), Trigger is late `deferredComputationTime` and Watermark change it 
smaller (`deferredComputationTime`), If we have 
`window1(..).winodw2(...).window3()`. the delay is increasing.

**So, the current approach only improved Level 1 group window, and 
end-to-end latency is `deferredComputationTime`.**

**If we want let all the window only fired late `deferredComputationTime`, 
we should think about SLA mechanism. (Which we had discussed before).**

The above description just from point of my view. So feel free to correct 
me if there are any incorrect analysis.

Please let me know what you think? 

Best,
SunJincheng

  



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066186#comment-16066186
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @fhueske @wuchong Thanks for your reviewing and comments. Thanks!

1. For the param rename I am not sure whether it can be shared with `early 
fire` feature  or not. I suggest using current name, and we can change it if we 
need when we dev the `early fire` feature. And feel free to rename it in this 
PR. if you and @wuchong Insist on to renaming, I am fine about that. :)

2. About timestamp and watermark:
- Timestamp: 
I think we can emit records with the correct timestamps(late than 
watermark,but corresponds to window time ), I thinks the code 
`timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp())` of  
`WindowOperator#emitWindowContents` can guarantee that logic. That's meant it 
late than watermark,but is correct.

- Watermark:
  In current flink framework,GroupWindow and OverWindow related the 
`Watermark`. So If i understand you correctly, you worry about GroupWindow 
followed by a GroupWindow or  a OverWindow. Let's see follows:
 - For followed by GroupWindow case:
  As we know  `deferredComputationTime` is global configuration, i.e. 
In one job all GroupWindow will using the same 
TRIGGER(`DeferredComputationTrigger`), that only fires when current watermark 
not smaller than ` (window.maxTimestamp + 
queryConfig.getDeferredComputationTime)`. The records will be late emitted by 
the Level N window, So dose the Level N+1 window. The late always is 
`getDeferredComputationTime` time. i.e., This approach adds latency but can 
reduce the number of update esp.(that we wanted)

 - For followed by OverWindow case:
   I think this approach works well for row-time OverWindow, because 
Over Clause using timestamp value range the window. I think it works well If we 
emit correct timestamp for records.(And we did it by 
`timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp())` of  
`WindowOperator#emitWindowContents`) 

 - For select, filter...etc. I think also work well (adds latency).

3.  About `Trigger+ AssignerWithPunctuatedWatermarks` (@wuchong comment 
above), Trigger is late `deferredComputationTime` and Watermark change it 
smaller (`deferredComputationTime`), If we have 
`window1(..).winodw2(...).window3()`. the delay is increasing.

**So, the current approach only improved Level 1 group window, and 
end-to-end latency is `deferredComputationTime`.**

**If we want let all the window only fired late `deferredComputationTime`, 
we should think about SLA mechanism. (Which we had discussed before).**

The above description just from point of my view. So feel free to correct 
me if there are any incorrect analysis.

Please let me know what you think? 

Best,
SunJincheng

  



> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-06-28 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124491150
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+
+
+/**
+ * Trait represents a collection of sort methods to manipulate the 
parameters
+ */
+
+trait CommonSort {
+  
+  private[flink] def offsetToString(offset: RexNode): String = {
--- End diff --

> It is more for consistency - to have a method for "ToString" for each 
feature of the sort (direction, fetch, fields...). As there is not cost for it 
I suggest to keep it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066189#comment-16066189
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124491150
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+
+
+/**
+ * Trait represents a collection of sort methods to manipulate the 
parameters
+ */
+
+trait CommonSort {
+  
+  private[flink] def offsetToString(offset: RexNode): String = {
--- End diff --

> It is more for consistency - to have a method for "ToString" for each 
feature of the sort (direction, fetch, fields...). As there is not cost for it 
I suggest to keep it


> Support Limit/Top(Sort) for Stream SQL
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour in

[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-06-28 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124491949
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+
+
+/**
+ * Trait represents a collection of sort methods to manipulate the 
parameters
+ */
+
+trait CommonSort {
+  
+  private[flink] def offsetToString(offset: RexNode): String = {
+val offsetToString = s"$offset"
+offsetToString
+  }
+  
+  
+  private[flink] def sortFieldsToString(
--- End diff --

it cannot be moved because explainTerms is a method from the super class 
SingleRel and only the DataStream/SetSort extends it not the CommonSort. Or is 
it possible in scala to have a trait that overrides a method that is inherited 
by another class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066193#comment-16066193
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124491949
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+
+
+/**
+ * Trait represents a collection of sort methods to manipulate the 
parameters
+ */
+
+trait CommonSort {
+  
+  private[flink] def offsetToString(offset: RexNode): String = {
+val offsetToString = s"$offset"
+offsetToString
+  }
+  
+  
+  private[flink] def sortFieldsToString(
--- End diff --

it cannot be moved because explainTerms is a method from the super class 
SingleRel and only the DataStream/SetSort extends it not the CommonSort. Or is 
it possible in scala to have a trait that overrides a method that is inherited 
by another class?


> Support Limit/Top(Sort) for Stream SQL
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 

[GitHub] flink pull request #3889: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-06-28 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124492156
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+
+
+/**
+ * Trait represents a collection of sort methods to manipulate the 
parameters
+ */
+
+trait CommonSort {
+  
+  private[flink] def offsetToString(offset: RexNode): String = {
+val offsetToString = s"$offset"
+offsetToString
+  }
+  
+  
+  private[flink] def sortFieldsToString(
--- End diff --

I do not see this behavior in the other classes that have a Common trait 
(e.g., DataStreamCalc has the explainsTerms in the class not in the CommonCalc)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4183: [FLINK-6969][table]Add support for deferred computation f...

2017-06-28 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi, @fhueske I write comments for a long time, so i have not seen your 
reply util i send out my reply.

I think we do not need the current trigger if we custom operators, we only 
change watermark is enough.
Is that your meaning.

Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066196#comment-16066196
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3889#discussion_r124492156
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+
+
+/**
+ * Trait represents a collection of sort methods to manipulate the 
parameters
+ */
+
+trait CommonSort {
+  
+  private[flink] def offsetToString(offset: RexNode): String = {
+val offsetToString = s"$offset"
+offsetToString
+  }
+  
+  
+  private[flink] def sortFieldsToString(
--- End diff --

I do not see this behavior in the other classes that have a Common trait 
(e.g., DataStreamCalc has the explainsTerms in the class not in the CommonCalc)


> Support Limit/Top(Sort) for Stream SQL
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP window

[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066194#comment-16066194
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi, @fhueske I write comments for a long time, so i have not seen your 
reply util i send out my reply.

I think we do not need the current trigger if we custom operators, we only 
change watermark is enough.
Is that your meaning.

Thanks,
SunJincheng


> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491313
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
---
@@ -324,6 +328,89 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testNFAChange() {
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
--- End diff --

I feel the Pattern is a bit to complicated for this test. e.g. the 
notPattern does not introduce any new corner cases.

I miss a test for situation where only one CS is generated. You can 
simulate it with:

@Test
public void testNFAChangedOnOneNewComputationState() {
Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
}).followedBy("a*").where(new SimpleCondition() {
private static final long serialVersionUID = 
1858562682635302605L;

@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().next("end").where(new 
IterativeCondition() {
private static final long serialVersionUID = 
8061969839441121955L;

@Override
public boolean filter(Event value, Context ctx) 
throws Exception {
return value.getName().equals("b");
}
}).within(Time.milliseconds(10));

NFACompiler.NFAFactory nfaFactory = 
NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA nfa = nfaFactory.createNFA();

// both the queue of ComputationStatus and eventSharedBuffer 
have changed
nfa.nfaChanged = false;
nfa.process(new Event(6, "start", 1.0), 6L);
nfa.nfaChanged = false;
nfa.process(new Event(6, "a", 1.0), 7L);
assertTrue("NFA status should change as the event matches the 
take condition of 'middle2' state", nfa.isNFAChanged());
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124492768
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -114,6 +115,28 @@ public DeweyNumber getVersion() {
return version;
}
 
+   @Override
+   public boolean equals(Object obj) {
+   if (obj instanceof ComputationState) {
+   ComputationState other = (ComputationState) obj;
+   return Objects.equals(state, other.state) &&
+   event == other.event &&
--- End diff --

how about comparing events with `.equals()`? It is not a primitive object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491510
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

An IT test for it would be handy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124492599
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -237,8 +254,15 @@ public boolean isEmpty() {
computationState.getCounter());
 
newComputationStates = Collections.emptyList();
+   nfaChanged = true;
--- End diff --

Just a question/side comment. This case always happens in pair with pruning 
sharedBuffer right? (NFA:330)

I would still leave it here, cause it's better be safe than sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124481149
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Shouldn't we reset nfaChanged to {{false}} here? Right now we do not update 
only if a Pattern did not start.

If we have a pattern like: A -> B, and sequence a, c, c, c, c,  the NFA 
will be updated after each c nevertheless it does not change, won't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491894
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -152,6 +153,12 @@
 
private TypeSerializer eventSerializer;
 
+   /**
+* Flag indicating whether the matching status of the state machine has 
changed.
+*/
+   @VisibleForTesting
--- End diff --

I think we need setter anyway (see AbstractKeyedCEPPatternOperator:277). So 
I would make it private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491446
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
---
@@ -324,6 +328,89 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testNFAChange() {
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
1858562682635302605L;
--- End diff --

Anyway I feel it would be easier to read and understand if we extract a 
test for a `nfaChange` case at a time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066202#comment-16066202
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491313
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
---
@@ -324,6 +328,89 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testNFAChange() {
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
--- End diff --

I feel the Pattern is a bit to complicated for this test. e.g. the 
notPattern does not introduce any new corner cases.

I miss a test for situation where only one CS is generated. You can 
simulate it with:

@Test
public void testNFAChangedOnOneNewComputationState() {
Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
}).followedBy("a*").where(new SimpleCondition() {
private static final long serialVersionUID = 
1858562682635302605L;

@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().next("end").where(new 
IterativeCondition() {
private static final long serialVersionUID = 
8061969839441121955L;

@Override
public boolean filter(Event value, Context ctx) 
throws Exception {
return value.getName().equals("b");
}
}).within(Time.milliseconds(10));

NFACompiler.NFAFactory nfaFactory = 
NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA nfa = nfaFactory.createNFA();

// both the queue of ComputationStatus and eventSharedBuffer 
have changed
nfa.nfaChanged = false;
nfa.process(new Event(6, "start", 1.0), 6L);
nfa.nfaChanged = false;
nfa.process(new Event(6, "a", 1.0), 7L);
assertTrue("NFA status should change as the event matches the 
take condition of 'middle2' state", nfa.isNFAChanged());
}


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066206#comment-16066206
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124492768
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -114,6 +115,28 @@ public DeweyNumber getVersion() {
return version;
}
 
+   @Override
+   public boolean equals(Object obj) {
+   if (obj instanceof ComputationState) {
+   ComputationState other = (ComputationState) obj;
+   return Objects.equals(state, other.state) &&
+   event == other.event &&
--- End diff --

how about comparing events with `.equals()`? It is not a primitive object.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066204#comment-16066204
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491446
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
---
@@ -324,6 +328,89 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testNFAChange() {
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
1858562682635302605L;
--- End diff --

Anyway I feel it would be easier to read and understand if we extract a 
test for a `nfaChange` case at a time.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066203#comment-16066203
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124492599
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -237,8 +254,15 @@ public boolean isEmpty() {
computationState.getCounter());
 
newComputationStates = Collections.emptyList();
+   nfaChanged = true;
--- End diff --

Just a question/side comment. This case always happens in pair with pruning 
sharedBuffer right? (NFA:330)

I would still leave it here, cause it's better be safe than sorry.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066208#comment-16066208
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491894
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -152,6 +153,12 @@
 
private TypeSerializer eventSerializer;
 
+   /**
+* Flag indicating whether the matching status of the state machine has 
changed.
+*/
+   @VisibleForTesting
--- End diff --

I think we need setter anyway (see AbstractKeyedCEPPatternOperator:277). So 
I would make it private.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066205#comment-16066205
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124481149
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Shouldn't we reset nfaChanged to {{false}} here? Right now we do not update 
only if a Pattern did not start.

If we have a pattern like: A -> B, and sequence a, c, c, c, c,  the NFA 
will be updated after each c nevertheless it does not change, won't it?


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066207#comment-16066207
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124491510
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

An IT test for it would be handy.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7025) Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over

2017-06-28 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066213#comment-16066213
 ] 

sunjincheng commented on FLINK-7025:


Yes, Sounds good.:)

> Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over
> 
>
> Key: FLINK-7025
> URL: https://issues.apache.org/jira/browse/FLINK-7025
> Project: Flink
>  Issue Type: Bug
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently we added `Cleanup State` feature. But It not work well if we 
> enabled the stateCleaning on Unbounded ProcTime NonPartitioned Over window, 
> Because in `ProcessFunctionWithCleanupState` we has using the keyed state.
> So, In this JIRA. I'll change the  `Unbounded ProcTime NonPartitioned Over` 
> to `partitioned Over` by using NullByteKeySelector. OR created a 
> `NonKeyedProcessFunctionWithCleanupState`. But I think the first way is 
> simpler. What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7025) Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over

2017-06-28 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7025:
-
Component/s: Table API & SQL

> Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over
> 
>
> Key: FLINK-7025
> URL: https://issues.apache.org/jira/browse/FLINK-7025
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently we added `Cleanup State` feature. But It not work well if we 
> enabled the stateCleaning on Unbounded ProcTime NonPartitioned Over window, 
> Because in `ProcessFunctionWithCleanupState` we has using the keyed state.
> So, In this JIRA. I'll change the  `Unbounded ProcTime NonPartitioned Over` 
> to `partitioned Over` by using NullByteKeySelector. OR created a 
> `NonKeyedProcessFunctionWithCleanupState`. But I think the first way is 
> simpler. What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414

2017-06-28 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066217#comment-16066217
 ] 

Chesnay Schepler commented on FLINK-6173:
-

could you try maven 3.2.5? For this version the shading still works as required 
by Flink and should be compatible with the maven-scala-plugin.

> flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
> 
>
> Key: FLINK-6173
> URL: https://issues.apache.org/jira/browse/FLINK-6173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhenghua Gao
>
> Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them 
> to org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> If a project depends on flink-table, and uses fasterxml as follows(function 
> explain uses fasterxml indirectly):
> {code:title=WordCount.scala|borderStyle=solid}
> object WordCountWithTable {
>   def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> val expr = input.toTable(tEnv)
> val result = expr
>   .groupBy('word)
>   .select('word, 'frequency.sum as 'frequency)
>   .filter('frequency === 2)
> println(tEnv.explain(result))
> result.toDataSet[WC].print()
>   }
>   case class WC(word: String, frequency: Long)
> }
> {code}
> It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.*
> I found after FLINK-5414,  flink-table didn't pack-in com.fasterxml.jackson.* 
> and the project would throw class not found exception.
> {code:borderStyle=solid}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper
>   at 
> org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164)
>   at 
> org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34)
>   at 
> org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 10 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7025) Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over

2017-06-28 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7025:
-
Affects Version/s: 1.4.0
   1.3.1

> Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over
> 
>
> Key: FLINK-7025
> URL: https://issues.apache.org/jira/browse/FLINK-7025
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently we added `Cleanup State` feature. But It not work well if we 
> enabled the stateCleaning on Unbounded ProcTime NonPartitioned Over window, 
> Because in `ProcessFunctionWithCleanupState` we has using the keyed state.
> So, In this JIRA. I'll change the  `Unbounded ProcTime NonPartitioned Over` 
> to `partitioned Over` by using NullByteKeySelector. OR created a 
> `NonKeyedProcessFunctionWithCleanupState`. But I think the first way is 
> simpler. What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124494317
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Currently for each input, we will deserialize the NFA and the `nfaChanged` 
field is `false` by default. So I think we don't need to do that. Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066218#comment-16066218
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124494317
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Currently for each input, we will deserialize the NFA and the `nfaChanged` 
field is `false` by default. So I think we don't need to do that. Thoughts?


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124496350
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Hmm, right we do access it from `StateBackend` each time. Not sure though 
if each time it is deserialized. I don't think there is such assumption. @kl0u 
could you shed some more light on it?

If I am right and it is not deserialized each time, I would reset the 
`nfaChanged` before writing it into the `StateBackend`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066225#comment-16066225
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124496350
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Hmm, right we do access it from `StateBackend` each time. Not sure though 
if each time it is deserialized. I don't think there is such assumption. @kl0u 
could you shed some more light on it?

If I am right and it is not deserialized each time, I would reset the 
`nfaChanged` before writing it into the `StateBackend`.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124499732
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -114,6 +115,28 @@ public DeweyNumber getVersion() {
return version;
}
 
+   @Override
+   public boolean equals(Object obj) {
+   if (obj instanceof ComputationState) {
+   ComputationState other = (ComputationState) obj;
+   return Objects.equals(state, other.state) &&
+   event == other.event &&
--- End diff --

For `ComputationState`, I think two `ComputationState` equals only if the 
last taken events of them point to the same event. So `.equals()` may be not 
correct. Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066251#comment-16066251
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124499732
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -114,6 +115,28 @@ public DeweyNumber getVersion() {
return version;
}
 
+   @Override
+   public boolean equals(Object obj) {
+   if (obj instanceof ComputationState) {
+   ComputationState other = (ComputationState) obj;
+   return Objects.equals(state, other.state) &&
+   event == other.event &&
--- End diff --

For `ComputationState`, I think two `ComputationState` equals only if the 
last taken events of them point to the same event. So `.equals()` may be not 
correct. Thoughts?


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124500395
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

This is only true for the `RocksDB` state backend. If you are using the 
`Heap` state backend, you do not (de)serialize at every access but you keep the 
same object. So the flag should be re-set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066258#comment-16066258
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124500395
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

This is only true for the `RocksDB` state backend. If you are using the 
`Heap` state backend, you do not (de)serialize at every access but you keep the 
same object. So the flag should be re-set.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066261#comment-16066261
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124500774
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -237,8 +254,15 @@ public boolean isEmpty() {
computationState.getCounter());
 
newComputationStates = Collections.emptyList();
+   nfaChanged = true;
--- End diff --

I think this may not always be true. For example, the 
`eventSharedBuffer.release`(NFA:250) may already have released the events that 
will be pruned. This will cause the pruning sharedBuffer do nothing. Thoughts?
Anyway, leave it here will be more safe.:)


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124500774
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -237,8 +254,15 @@ public boolean isEmpty() {
computationState.getCounter());
 
newComputationStates = Collections.emptyList();
+   nfaChanged = true;
--- End diff --

I think this may not always be true. For example, the 
`eventSharedBuffer.release`(NFA:250) may already have released the events that 
will be pruned. This will cause the pruning sharedBuffer do nothing. Thoughts?
Anyway, leave it here will be more safe.:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124501193
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -114,6 +115,28 @@ public DeweyNumber getVersion() {
return version;
}
 
+   @Override
+   public boolean equals(Object obj) {
+   if (obj instanceof ComputationState) {
+   ComputationState other = (ComputationState) obj;
+   return Objects.equals(state, other.state) &&
+   event == other.event &&
--- End diff --

The combination of `Object.equals(event, other.event)` and `count == 
other.counter` tells exactly that. I would not base the implementation on 
reference equity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066265#comment-16066265
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124501193
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 ---
@@ -114,6 +115,28 @@ public DeweyNumber getVersion() {
return version;
}
 
+   @Override
+   public boolean equals(Object obj) {
+   if (obj instanceof ComputationState) {
+   ComputationState other = (ComputationState) obj;
+   return Objects.equals(state, other.state) &&
+   event == other.event &&
--- End diff --

The combination of `Object.equals(event, other.event)` and `count == 
other.counter` tells exactly that. I would not base the implementation on 
reference equity.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124504426
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -458,4 +459,52 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifySql(sqlQuery, expected)
   }
 
+  // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] 
is fixed
+  @Ignore
+  @Test(expected = classOf[NullPointerException])
--- End diff --

What is the expected result of the test given that CALCITE-1860 is fixed?
Since the test is ignored, we should implement the test such that is 
validates the expected result. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066288#comment-16066288
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124504426
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -458,4 +459,52 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifySql(sqlQuery, expected)
   }
 
+  // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] 
is fixed
+  @Ignore
+  @Test(expected = classOf[NullPointerException])
--- End diff --

What is the expected result of the test given that CALCITE-1860 is fixed?
Since the test is ignored, we should implement the test such that is 
validates the expected result. 


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4075: [FLINK-6494] Migrate ResourceManager/Yarn/Mesos co...

2017-06-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124504146
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -33,6 +33,45 @@
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
 
+   public static final ConfigOption LOCAL_NUMBER_RESOURCE_MANAGER 
= ConfigOptions
+   .key("local.number-resourcemanager")
+   .defaultValue(1);
+
+   public static final ConfigOption RESOURCE_MANAGER_IPC_PORT = 
ConfigOptions
+   .key("resourcemanager.rpc.port")
+   .defaultValue(0);
+
+   /**
+* Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
+* for other JVM memory usage.
+*/
+   public static final ConfigOption CONTAINERIZED_HEAP_CUTOFF_RATIO 
= ConfigOptions
+   .key("containerized.heap-cutoff-ratio")
+   .defaultValue(0.25f)
+   .withDeprecatedKeys(ConfigConstants.YARN_HEAP_CUTOFF_RATIO);
--- End diff --

let's use the actual key here. I'd like to end up in a state where we could 
remove the deprecated parts of the ConfigConstants class without requiring 
other changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6494) Migrate ResourceManager configuration options

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066290#comment-16066290
 ] 

ASF GitHub Bot commented on FLINK-6494:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124504177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -33,6 +33,45 @@
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
 
+   public static final ConfigOption LOCAL_NUMBER_RESOURCE_MANAGER 
= ConfigOptions
+   .key("local.number-resourcemanager")
+   .defaultValue(1);
+
+   public static final ConfigOption RESOURCE_MANAGER_IPC_PORT = 
ConfigOptions
+   .key("resourcemanager.rpc.port")
+   .defaultValue(0);
+
+   /**
+* Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
+* for other JVM memory usage.
+*/
+   public static final ConfigOption CONTAINERIZED_HEAP_CUTOFF_RATIO 
= ConfigOptions
+   .key("containerized.heap-cutoff-ratio")
+   .defaultValue(0.25f)
+   .withDeprecatedKeys(ConfigConstants.YARN_HEAP_CUTOFF_RATIO);
+
+   /**
+* Minimum amount of heap memory to remove in containers, as a safety 
margin.
+*/
+   public static final ConfigOption CONTAINERIZED_HEAP_CUTOFF_MIN 
= ConfigOptions
+   .key("containerized.heap-cutoff-min")
+   .defaultValue(600)
+   .withDeprecatedKeys(ConfigConstants.YARN_HEAP_CUTOFF_MIN);
--- End diff --

same as above


> Migrate ResourceManager configuration options
> -
>
> Key: FLINK-6494
> URL: https://issues.apache.org/jira/browse/FLINK-6494
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, ResourceManager
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4075: [FLINK-6494] Migrate ResourceManager/Yarn/Mesos co...

2017-06-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124503662
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -33,6 +33,45 @@
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
 
+   public static final ConfigOption LOCAL_NUMBER_RESOURCE_MANAGER 
= ConfigOptions
+   .key("local.number-resourcemanager")
+   .defaultValue(1);
+
+   public static final ConfigOption RESOURCE_MANAGER_IPC_PORT = 
ConfigOptions
--- End diff --

remove RESOURCE_MANAGER prefix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4075: [FLINK-6494] Migrate ResourceManager/Yarn/Mesos co...

2017-06-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124501969
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosConfigOptions.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to mesos settings.
+ */
+public class MesosConfigOptions {
--- End diff --

Let's rename this to `MesosOptions`, akin to the classes in flink-core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4075: [FLINK-6494] Migrate ResourceManager/Yarn/Mesos co...

2017-06-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124504177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -33,6 +33,45 @@
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
 
+   public static final ConfigOption LOCAL_NUMBER_RESOURCE_MANAGER 
= ConfigOptions
+   .key("local.number-resourcemanager")
+   .defaultValue(1);
+
+   public static final ConfigOption RESOURCE_MANAGER_IPC_PORT = 
ConfigOptions
+   .key("resourcemanager.rpc.port")
+   .defaultValue(0);
+
+   /**
+* Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
+* for other JVM memory usage.
+*/
+   public static final ConfigOption CONTAINERIZED_HEAP_CUTOFF_RATIO 
= ConfigOptions
+   .key("containerized.heap-cutoff-ratio")
+   .defaultValue(0.25f)
+   .withDeprecatedKeys(ConfigConstants.YARN_HEAP_CUTOFF_RATIO);
+
+   /**
+* Minimum amount of heap memory to remove in containers, as a safety 
margin.
+*/
+   public static final ConfigOption CONTAINERIZED_HEAP_CUTOFF_MIN 
= ConfigOptions
+   .key("containerized.heap-cutoff-min")
+   .defaultValue(600)
+   .withDeprecatedKeys(ConfigConstants.YARN_HEAP_CUTOFF_MIN);
--- End diff --

same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6494) Migrate ResourceManager configuration options

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066293#comment-16066293
 ] 

ASF GitHub Bot commented on FLINK-6494:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124503662
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -33,6 +33,45 @@
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
 
+   public static final ConfigOption LOCAL_NUMBER_RESOURCE_MANAGER 
= ConfigOptions
+   .key("local.number-resourcemanager")
+   .defaultValue(1);
+
+   public static final ConfigOption RESOURCE_MANAGER_IPC_PORT = 
ConfigOptions
--- End diff --

remove RESOURCE_MANAGER prefix


> Migrate ResourceManager configuration options
> -
>
> Key: FLINK-6494
> URL: https://issues.apache.org/jira/browse/FLINK-6494
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, ResourceManager
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6494) Migrate ResourceManager configuration options

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066292#comment-16066292
 ] 

ASF GitHub Bot commented on FLINK-6494:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124501969
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosConfigOptions.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to mesos settings.
+ */
+public class MesosConfigOptions {
--- End diff --

Let's rename this to `MesosOptions`, akin to the classes in flink-core.


> Migrate ResourceManager configuration options
> -
>
> Key: FLINK-6494
> URL: https://issues.apache.org/jira/browse/FLINK-6494
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, ResourceManager
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6494) Migrate ResourceManager configuration options

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066291#comment-16066291
 ] 

ASF GitHub Bot commented on FLINK-6494:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4075#discussion_r124504146
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -33,6 +33,45 @@
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
 
+   public static final ConfigOption LOCAL_NUMBER_RESOURCE_MANAGER 
= ConfigOptions
+   .key("local.number-resourcemanager")
+   .defaultValue(1);
+
+   public static final ConfigOption RESOURCE_MANAGER_IPC_PORT = 
ConfigOptions
+   .key("resourcemanager.rpc.port")
+   .defaultValue(0);
+
+   /**
+* Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
+* for other JVM memory usage.
+*/
+   public static final ConfigOption CONTAINERIZED_HEAP_CUTOFF_RATIO 
= ConfigOptions
+   .key("containerized.heap-cutoff-ratio")
+   .defaultValue(0.25f)
+   .withDeprecatedKeys(ConfigConstants.YARN_HEAP_CUTOFF_RATIO);
--- End diff --

let's use the actual key here. I'd like to end up in a state where we could 
remove the deprecated parts of the ConfigConstants class without requiring 
other changes.


> Migrate ResourceManager configuration options
> -
>
> Key: FLINK-6494
> URL: https://issues.apache.org/jira/browse/FLINK-6494
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, ResourceManager
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4183: [FLINK-6969][table]Add support for deferred computation f...

2017-06-28 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @fhueske , jincheng and me have discussed offline and have an unified 
opinion.

We think the approach of window(with custom trigger) + custom operator will 
increase the latency when multiple window aggregate is applied. For example, 
defer 1 hour to compute with two level window aggregates. The output and 
watermark of the first level window have been deferred 1 hour. In fact, the 
output is in order now. However, the second window will still defer 1 hour to 
compute which result in the result is delayed for two hours. 

We think we only need to hold the watermark back at source, then all the 
downstream window operators will defer the given offset to compute. And the 
end-to-end latency is the given offset.

Therefor, we propose to add a custom operator (to offset the watermark) 
after the source (the DataStream in the physical plan tree root). And no custom 
trigger needed.

What do you think? 






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066306#comment-16066306
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @fhueske , jincheng and me have discussed offline and have an unified 
opinion.

We think the approach of window(with custom trigger) + custom operator will 
increase the latency when multiple window aggregate is applied. For example, 
defer 1 hour to compute with two level window aggregates. The output and 
watermark of the first level window have been deferred 1 hour. In fact, the 
output is in order now. However, the second window will still defer 1 hour to 
compute which result in the result is delayed for two hours. 

We think we only need to hold the watermark back at source, then all the 
downstream window operators will defer the given offset to compute. And the 
end-to-end latency is the given offset.

Therefor, we propose to add a custom operator (to offset the watermark) 
after the source (the DataStream in the physical plan tree root). And no custom 
trigger needed.

What do you think? 






> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-28 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124506864
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -458,4 +459,52 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifySql(sqlQuery, expected)
   }
 
+  // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] 
is fixed
+  @Ignore
+  @Test(expected = classOf[NullPointerException])
--- End diff --

Yes, the expected test result should not throw exception. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066309#comment-16066309
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124506864
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -458,4 +459,52 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifySql(sqlQuery, expected)
   }
 
+  // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] 
is fixed
+  @Ignore
+  @Test(expected = classOf[NullPointerException])
--- End diff --

Yes, the expected test result should not throw exception. 


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4194: Merge PR for 1.3

2017-06-28 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4194


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4193: Merge PR for 1.4

2017-06-28 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4193


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-28 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124510489
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066329#comment-16066329
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124510489
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -270,10 +270,12 @@ private void updateLastSeenWatermark(long timestamp) {
}
 
private void updateNFA(NFA nfa) throws IOException {
-   if (nfa.isEmpty()) {
-   nfaOperatorState.clear();
-   } else {
-   nfaOperatorState.update(nfa);
+   if (nfa.isNFAChanged()) {
+   if (nfa.isEmpty()) {
+   nfaOperatorState.clear();
+   } else {
+   nfaOperatorState.update(nfa);
--- End diff --

Make sense.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   4   >