[jira] [Commented] (FLINK-5103) TaskManager process virtual memory and physical memory used size gauge

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15678795#comment-15678795
 ] 

ASF GitHub Bot commented on FLINK-5103:
---

GitHub user zhuhaifengleon opened a pull request:

https://github.com/apache/flink/pull/2833

[FLINK-5103] [Metrics] TaskManager process virtual memory and physical 
memory used size gauge

This PR add TaskManger process virtual memory and physical memory used size 
gauge metrics.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhuhaifengleon/flink FLINK-5103

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2833.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2833


commit f65176360593ab72a6fee27f4ae54a143e386a67
Author: zhuhaifengleon 
Date:   2016-11-11T01:29:53Z

[metrics] expose process VmRSS/VmSize metrics

commit 5f1bf29913b809f6dad178b7f0e7e59381dfdd6a
Author: zhuhaifengleon 
Date:   2016-11-19T07:22:50Z

[FLINK-5103] [Metrics] TaskManager process virtual memory and physical 
memory used size gauge




> TaskManager process virtual memory and physical memory used size gauge
> --
>
> Key: FLINK-5103
> URL: https://issues.apache.org/jira/browse/FLINK-5103
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add TaskManger Process virtual memory and physical memory used size gauge 
> metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2833: [FLINK-5103] [Metrics] TaskManager process virtual...

2016-11-18 Thread zhuhaifengleon
GitHub user zhuhaifengleon opened a pull request:

https://github.com/apache/flink/pull/2833

[FLINK-5103] [Metrics] TaskManager process virtual memory and physical 
memory used size gauge

This PR add TaskManger process virtual memory and physical memory used size 
gauge metrics.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhuhaifengleon/flink FLINK-5103

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2833.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2833


commit f65176360593ab72a6fee27f4ae54a143e386a67
Author: zhuhaifengleon 
Date:   2016-11-11T01:29:53Z

[metrics] expose process VmRSS/VmSize metrics

commit 5f1bf29913b809f6dad178b7f0e7e59381dfdd6a
Author: zhuhaifengleon 
Date:   2016-11-19T07:22:50Z

[FLINK-5103] [Metrics] TaskManager process virtual memory and physical 
memory used size gauge




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5103) TaskManager process virtual memory and physical memory used size gauge

2016-11-18 Thread zhuhaifeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhuhaifeng updated FLINK-5103:
--
Summary: TaskManager process virtual memory and physical memory used size 
gauge  (was: Process virtual memory and physical memory used size gauge)

> TaskManager process virtual memory and physical memory used size gauge
> --
>
> Key: FLINK-5103
> URL: https://issues.apache.org/jira/browse/FLINK-5103
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add TaskManger Process virtual memory and physical memory used size gauge 
> metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5103) TaskManager process virtual memory and physical memory used size gauge

2016-11-18 Thread zhuhaifeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhuhaifeng updated FLINK-5103:
--
Component/s: Metrics

> TaskManager process virtual memory and physical memory used size gauge
> --
>
> Key: FLINK-5103
> URL: https://issues.apache.org/jira/browse/FLINK-5103
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add TaskManger Process virtual memory and physical memory used size gauge 
> metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5103) Process virtual memory and physical memory used size gauge

2016-11-18 Thread zhuhaifeng (JIRA)
zhuhaifeng created FLINK-5103:
-

 Summary: Process virtual memory and physical memory used size gauge
 Key: FLINK-5103
 URL: https://issues.apache.org/jira/browse/FLINK-5103
 Project: Flink
  Issue Type: Improvement
Reporter: zhuhaifeng
Assignee: zhuhaifeng
Priority: Minor
 Fix For: 1.2.0


Add TaskManger Process virtual memory and physical memory used size gauge 
metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5031) Consecutive DataStream.split() ignored

2016-11-18 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15678716#comment-15678716
 ] 

Renkai Ge edited comment on FLINK-5031 at 11/19/16 6:29 AM:


[~fhueske]The second split was not ignored, it was unioned by the first 
one.{code}union({1,2},{1,2,3,4,5})={1,2,3,4,5}{code},if the second select 
change to "GreaterEqual", the result would be 
{code}{3,4,5,6,7,8,9,10,11}{code},that was {code} 
union({3,4,5,6,7,8,9,10,11},{6,7,8,9,10,11}) {code} see 
https://github.com/apache/flink/blob/a612b9966f3ee020a5721ac2f039a3633c40146c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java#L114.
In the current implementation of split, you will get a unioned result of all 
split&select combination, I think it  was strange somehow.We might solve this 
issue by reimplement the split function by an OneInputTransformation.


was (Author: renkaige):
[~fhueske]The second split was not ignored, it was unioned by the first 
one.{code}union({1,2},{1,2,3,4,5})={1,2,3,4,5}{code},if the second select 
change to "GreaterEqual", the result would be {3,4,5,6,7,8,9,10,11},that was 
{code} union({3,4,5,6,7,8,9,10,11},{6,7,8,9,10,11}) {code} see 
https://github.com/apache/flink/blob/a612b9966f3ee020a5721ac2f039a3633c40146c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java#L114.
In the current implementation of split, you will get a unioned result of all 
split&select combination, I think it  was strange somehow.We might solve this 
issue by reimplement the split function by an OneInputTransformation.

> Consecutive DataStream.split() ignored
> --
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>Assignee: Renkai Ge
> Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector {
>   long threshold;
>   public ThresholdSelector(long threshold) {
>   this.threshold = threshold;
>   }
>   @Override
>   public Iterable select(Long value) {
>   if (value < threshold) {
>   return Collections.singletonList("Less");
>   } else {
>   return Collections.singletonList("GreaterEqual");
>   }
>   }
> }
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   SplitStream split1 = env.generateSequence(1, 11)
>   .split(new ThresholdSelector(6));
>   // stream11 should be [1,2,3,4,5]
>   DataStream stream11 = split1.select("Less");
>   SplitStream split2 = stream11
> //.map(new MapFunction() {
> //@Override
> //public Long map(Long value) throws Exception {
> //return value;
> //}
> //})
>   .split(new ThresholdSelector(3));
>   DataStream stream21 = split2.select("Less");
>   // stream21 should be [1,2]
>   stream21.print();
>   env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5031) Consecutive DataStream.split() ignored

2016-11-18 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15678716#comment-15678716
 ] 

Renkai Ge commented on FLINK-5031:
--

[~fhueske]The second split was not ignored, it was unioned by the first 
one.{code}union({1,2},{1,2,3,4,5})={1,2,3,4,5}{code},if the second select 
change to "GreaterEqual", the result would be {3,4,5,6,7,8,9,10,11},that was 
{code} union({3,4,5,6,7,8,9,10,11},{6,7,8,9,10,11}) {code} see 
https://github.com/apache/flink/blob/a612b9966f3ee020a5721ac2f039a3633c40146c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java#L114.
In the current implementation of split, you will get a unioned result of all 
split&select combination, I think it  was strange somehow.We might solve this 
issue by reimplement the split function by an OneInputTransformation.

> Consecutive DataStream.split() ignored
> --
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>Assignee: Renkai Ge
> Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector {
>   long threshold;
>   public ThresholdSelector(long threshold) {
>   this.threshold = threshold;
>   }
>   @Override
>   public Iterable select(Long value) {
>   if (value < threshold) {
>   return Collections.singletonList("Less");
>   } else {
>   return Collections.singletonList("GreaterEqual");
>   }
>   }
> }
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   SplitStream split1 = env.generateSequence(1, 11)
>   .split(new ThresholdSelector(6));
>   // stream11 should be [1,2,3,4,5]
>   DataStream stream11 = split1.select("Less");
>   SplitStream split2 = stream11
> //.map(new MapFunction() {
> //@Override
> //public Long map(Long value) throws Exception {
> //return value;
> //}
> //})
>   .split(new ThresholdSelector(3));
>   DataStream stream21 = split2.select("Less");
>   // stream21 should be [1,2]
>   stream21.print();
>   env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2016-11-18 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-5053:
--
Description: 
There is currently basically no difference between savepoints and checkpoints 
in Flink and both are created through exactly the same process.

However, savepoints and checkpoints have a slightly different meaning which we 
should take into account to keep Flink efficient:

- Savepoints are (typically infrequently) triggered by the user to create a 
state from which the application can be restarted, e.g. because Flink, some 
code, or the parallelism needs to be changed.

- Checkpoints are (typically frequently) triggered by the System to allow for 
fast recovery in case of failure, but keeping the job/system unchanged.

This means that savepoints and checkpoints can have different properties in 
that:

- Savepoint should represent a state of the application, where characteristics 
of the job (e.g. parallelism) can be adjusted for the next restart. One example 
for things that savepoints need to be aware of are key-groups. Savepoints can 
potentially be a little more expensive than checkpoints, because they are 
usually created a lot less frequently through the user.

- Checkpoints are frequently triggered by the system to allow for fast failure 
recovery. However, failure recovery leaves all characteristics of the job 
unchanged. This checkpoints do not have to be aware of those, e.g. think again 
of key groups. Checkpoints should run faster than creating savepoints, in 
particular it would be nice to have incremental checkpoints.

For a first approach, I would suggest the following steps/changes:

- In checkpoint coordination: differentiate between triggering checkpoints 
and savepoints. Introduce properties for checkpoints that describe their set of 
abilities, e.g. "is-key-group-aware", "is-incremental".

- In state handle infrastructure: introduce state handles that reflect 
incremental checkpoints and drop full key-group awareness, i.e. covering 
folders instead of files and not having keygroup_id -> file/offset mapping, but 
keygroup_range -> folder?

- Backend side: We should start with RocksDB by reintroducing something similar 
to semi-async snapshots, but using 
BackupableDBOptions::setShareTableFiles(true) and transferring only new 
incremental outputs to HDFS. Notice that using RocksDB's internal backup 
mechanism is giving up on the information about individual key-groups. But as 
explained above, this should be totally acceptable for checkpoints, while 
savepoints should use the key-group-aware fully async mode. Of course we also 
need to implement the ability to restore from both types of snapshots.

One problem in the suggested approach is still that even checkpoints should 
support scale-down, in case that only a smaller number of instances is left 
available in a recovery case.




  was:
There is currently basically no difference between savepoints and checkpoints 
in Flink and both are created through exactly the same process.

However, savepoints and checkpoints have a slightly different meaning which we 
should take into account to keep Flink efficient:

- Savepoints are (typically infrequently) triggered by the user to create a 
state from which the application can be restarted, e.g. because Flink, some 
code, or the parallelism needs to be changed.

- Checkpoints are (typically frequently) triggered by the System to allow for 
fast recovery in case of failure, but keeping the job/system unchanged.

This means that savepoints and checkpoints can have different properties in 
that:

- Savepoint should represent a state of the application, where characteristics 
of the job (e.g. parallelism) can be adjusted for the next restart. One example 
for things that savepoints need to be aware of are key-groups. Savepoints can 
potentially be a little more expensive than checkpoints, because they are 
usually created a lot less frequently through the user.

- Checkpoints are frequently triggered by the system to allow for fast failure 
recovery. However, failure recovery leaves all characteristics of the job 
unchanged. This checkpoints do not have to be aware of those, e.g. think again 
of key groups. Checkpoints should run faster than creating savepoints, in 
particular it would be nice to have incremental checkpoints.

For a first approach, I would suggest the following steps/changes:

- In checkpoint coordination: differentiate between triggering checkpoints 
and savepoints. Introduce properties for checkpoints that describe their set of 
abilities, e.g. "is-key-group-aware", "is-incremental".

- In state handle infrastructure: introduce state handles that reflect 
incremental checkpoints and drop full key-group awareness, i.e. covering 
folders instead of files and not having keygroup_id -> file/offset mapping, but 
keygroup_range -> folder?

- Backend side: We should start wit

[jira] [Created] (FLINK-5102) Connection establishment does not react to interrupt

2016-11-18 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5102:
--

 Summary: Connection establishment does not react to interrupt
 Key: FLINK-5102
 URL: https://issues.apache.org/jira/browse/FLINK-5102
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.3
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 1.2.0, 1.1.4


Interrupting a connection establishment does not to react to interrupts.

{code}
Task - Task '... (60/120)' did not react to cancelling signal, but is stuck in 
method:
java.lang.Object.$$YJP$$wait(Native Method)
java.lang.Object.wait(Object.java)
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:191)
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:118)
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:395)
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:414)
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:152)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:638)
java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4288) Make it possible to unregister tables

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677737#comment-15677737
 ] 

ASF GitHub Bot commented on FLINK-4288:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2511#discussion_r88737974
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -133,12 +134,24 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 registerTableInternal(name, tableTable)
   case e: StreamTableEnvironment =>
 val sTableTable = new TransStreamTable(table.getRelNode, true)
-tables.add(name, sTableTable)
+schema.addTable(name, sTableTable)
 }
 
   }
 
   /**
+* Unregisters a [[Table]] in the TableEnvironment's catalog.
+* Unregistered tables cannot be referenced in SQL queries anymore.
+*
+* @param name The name under which the table is registered.
+*/
+  def unregisterTable(name: String): Unit = {
+
+checkValidTableName(name)
--- End diff --

Do we need to check if the name is valid? Couldn't we just try to delete it?



> Make it possible to unregister tables
> -
>
> Key: FLINK-4288
> URL: https://issues.apache.org/jira/browse/FLINK-4288
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Table names can not be changed yet. After registration you can not modify the 
> table behind a table name. Maybe this behavior is too restrictive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2511: [FLINK-4288] [table] Make it possible to unregiste...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2511#discussion_r88737982
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -133,12 +134,24 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 registerTableInternal(name, tableTable)
   case e: StreamTableEnvironment =>
 val sTableTable = new TransStreamTable(table.getRelNode, true)
-tables.add(name, sTableTable)
+schema.addTable(name, sTableTable)
 }
 
   }
 
   /**
+* Unregisters a [[Table]] in the TableEnvironment's catalog.
+* Unregistered tables cannot be referenced in SQL queries anymore.
+*
+* @param name The name under which the table is registered.
+*/
+  def unregisterTable(name: String): Unit = {
--- End diff --

Do we want to return a boolean success value or throw an exception if the 
table does not exist?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4288) Make it possible to unregister tables

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677735#comment-15677735
 ] 

ASF GitHub Bot commented on FLINK-4288:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2511#discussion_r88737982
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -133,12 +134,24 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 registerTableInternal(name, tableTable)
   case e: StreamTableEnvironment =>
 val sTableTable = new TransStreamTable(table.getRelNode, true)
-tables.add(name, sTableTable)
+schema.addTable(name, sTableTable)
 }
 
   }
 
   /**
+* Unregisters a [[Table]] in the TableEnvironment's catalog.
+* Unregistered tables cannot be referenced in SQL queries anymore.
+*
+* @param name The name under which the table is registered.
+*/
+  def unregisterTable(name: String): Unit = {
--- End diff --

Do we want to return a boolean success value or throw an exception if the 
table does not exist?


> Make it possible to unregister tables
> -
>
> Key: FLINK-4288
> URL: https://issues.apache.org/jira/browse/FLINK-4288
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Table names can not be changed yet. After registration you can not modify the 
> table behind a table name. Maybe this behavior is too restrictive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4288) Make it possible to unregister tables

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677736#comment-15677736
 ] 

ASF GitHub Bot commented on FLINK-4288:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2511#discussion_r88737674
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkSchema.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import java.util
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.jdbc.CalciteSchema.TableEntry
+import org.apache.calcite.schema
+import org.apache.calcite.schema.SchemaPlus
+
+/**
+  * Wraps [[CalciteSchema]] and allows for deleting tables.
+  */
+class FlinkSchema {
--- End diff --

I think the benefit of this wrapper is rather limited.
Couldn't we just call `schema.tableMap.remove()` in `TableEnvironment`?


> Make it possible to unregister tables
> -
>
> Key: FLINK-4288
> URL: https://issues.apache.org/jira/browse/FLINK-4288
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Table names can not be changed yet. After registration you can not modify the 
> table behind a table name. Maybe this behavior is too restrictive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2511: [FLINK-4288] [table] Make it possible to unregiste...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2511#discussion_r88737974
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -133,12 +134,24 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 registerTableInternal(name, tableTable)
   case e: StreamTableEnvironment =>
 val sTableTable = new TransStreamTable(table.getRelNode, true)
-tables.add(name, sTableTable)
+schema.addTable(name, sTableTable)
 }
 
   }
 
   /**
+* Unregisters a [[Table]] in the TableEnvironment's catalog.
+* Unregistered tables cannot be referenced in SQL queries anymore.
+*
+* @param name The name under which the table is registered.
+*/
+  def unregisterTable(name: String): Unit = {
+
+checkValidTableName(name)
--- End diff --

Do we need to check if the name is valid? Couldn't we just try to delete it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2511: [FLINK-4288] [table] Make it possible to unregiste...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2511#discussion_r88737674
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkSchema.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import java.util
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.jdbc.CalciteSchema.TableEntry
+import org.apache.calcite.schema
+import org.apache.calcite.schema.SchemaPlus
+
+/**
+  * Wraps [[CalciteSchema]] and allows for deleting tables.
+  */
+class FlinkSchema {
--- End diff --

I think the benefit of this wrapper is rather limited.
Couldn't we just call `schema.tableMap.remove()` in `TableEnvironment`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677686#comment-15677686
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88683102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure, ValidationSuccess}
+
+
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
--- End diff --

Please check exception message.


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677690#comment-15677690
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88682935
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 ---
@@ -324,10 +335,19 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
   }
 
+  lazy val prefixGet: PackratParser[Expression] =
+GET ~ "(" ~ composite ~ ","  ~ literalExpr ~ ")" ^^ {
+  case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+GetCompositeField(e, index.asInstanceOf[Literal].value)
+  }
+
+  lazy val prefixFlattening: PackratParser[Expression] =
+GET ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
--- End diff --

`GET` -> `FLATTEN`?
Please add a test case for flatten in prefix notation.


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677684#comment-15677684
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88682137
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
 ---
@@ -79,14 +83,27 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   }
 
   private def createAdvancedType(typeInfo: TypeInformation[_]): 
RelDataType = typeInfo match {
-// TODO add specific RelDataTypes
-// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
+case ct: CompositeType[_] =>
+  new CompositeRelDataType(ct, this)
+
+// TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, 
ObjectArrayTypeInfo
 case ti: TypeInformation[_] =>
   new GenericRelDataType(typeInfo, 
getTypeSystem.asInstanceOf[FlinkTypeSystem])
 
 case ti@_ =>
   throw TableException(s"Unsupported type information: $ti")
   }
+
+  override def createTypeWithNullability(
+  relDataType: RelDataType,
+  nullable: Boolean)
+: RelDataType = relDataType match {
+case composite: CompositeRelDataType =>
+  // at the moment we do not care about nullability
--- End diff --

I see, thanks!


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677687#comment-15677687
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88679280
  
--- Diff: docs/dev/table_api.md ---
@@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, 
TEMPORAL)
   
 
 
+
+  
+{% highlight java %}
+ANY.flatten()
+{% endhighlight %}
+  
+  
+Converts a Flink composite type (such as Tuple, POJO, etc.) and 
all of its subtypes into a flat representation where every subtype is a 
separate field.
--- End diff --

How are the fields named?


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677688#comment-15677688
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88699109
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 ---
@@ -74,15 +75,52 @@ object RexNodeTranslator {
 
   /**
 * Parses all input expressions to [[UnresolvedAlias]].
-* And expands star to parent's full project list.
+* And expands star to parent's full project list and flattens 
composite types.
 */
-  def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): 
Seq[NamedExpression] = {
+  def expandProjectList(
+  exprs: Seq[Expression],
+  parent: LogicalNode,
+  tableEnv: TableEnvironment)
+: Seq[NamedExpression] = {
+
 val projectList = new ListBuffer[NamedExpression]
 exprs.foreach {
+
   case n: UnresolvedFieldReference if n.name == "*" =>
 projectList ++= parent.output.map(UnresolvedAlias(_))
+
+  // flattening can only applied on field references
+  case Flattening(composite) if
--- End diff --

Add a case for `Flattening(_)` that catches Flattenings on non-field 
expressions and throw a `ValidationException`?


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677685#comment-15677685
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88679626
  
--- Diff: docs/dev/table_api.md ---
@@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, 
TEMPORAL)
   
 
 
+
+  
+{% highlight java %}
+ANY.flatten()
+{% endhighlight %}
+  
+  
+Converts a Flink composite type (such as Tuple, POJO, etc.) and 
all of its subtypes into a flat representation where every subtype is a 
separate field.
+  
+
+
+
+  
+{% highlight java %}
+COMPOSITE.get(STRING)
--- End diff --

OK, I agree `.get()` has benefits too.


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677689#comment-15677689
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88684517
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure, ValidationSuccess}
+
+
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ExprValidationResult =
+ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: Expression, key: Any) extends 
UnaryExpression {
+
+  private var fieldIndex: Option[Int] = None
+
+  override def toString = s"$child.get($key)"
+
+  override private[flink] def validateInput(): ExprValidationResult = {
--- End diff --

Don't we need to recursively check if `child` is valid as well? Could be 
the result of an expression, right?


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677691#comment-15677691
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88699648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure, ValidationSuccess}
+
+
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ExprValidationResult =
+ValidationFailure(s"Unresolved flattening of $child")
--- End diff --

Add a comment that `Flattening` is converted into `GetCompositeFields` 
before validation?


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88679280
  
--- Diff: docs/dev/table_api.md ---
@@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, 
TEMPORAL)
   
 
 
+
+  
+{% highlight java %}
+ANY.flatten()
+{% endhighlight %}
+  
+  
+Converts a Flink composite type (such as Tuple, POJO, etc.) and 
all of its subtypes into a flat representation where every subtype is a 
separate field.
--- End diff --

How are the fields named?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88684517
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure, ValidationSuccess}
+
+
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ExprValidationResult =
+ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: Expression, key: Any) extends 
UnaryExpression {
+
+  private var fieldIndex: Option[Int] = None
+
+  override def toString = s"$child.get($key)"
+
+  override private[flink] def validateInput(): ExprValidationResult = {
--- End diff --

Don't we need to recursively check if `child` is valid as well? Could be 
the result of an expression, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88679626
  
--- Diff: docs/dev/table_api.md ---
@@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, 
TEMPORAL)
   
 
 
+
+  
+{% highlight java %}
+ANY.flatten()
+{% endhighlight %}
+  
+  
+Converts a Flink composite type (such as Tuple, POJO, etc.) and 
all of its subtypes into a flat representation where every subtype is a 
separate field.
+  
+
+
+
+  
+{% highlight java %}
+COMPOSITE.get(STRING)
--- End diff --

OK, I agree `.get()` has benefits too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88683102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure, ValidationSuccess}
+
+
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
--- End diff --

Please check exception message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88699109
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 ---
@@ -74,15 +75,52 @@ object RexNodeTranslator {
 
   /**
 * Parses all input expressions to [[UnresolvedAlias]].
-* And expands star to parent's full project list.
+* And expands star to parent's full project list and flattens 
composite types.
 */
-  def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): 
Seq[NamedExpression] = {
+  def expandProjectList(
+  exprs: Seq[Expression],
+  parent: LogicalNode,
+  tableEnv: TableEnvironment)
+: Seq[NamedExpression] = {
+
 val projectList = new ListBuffer[NamedExpression]
 exprs.foreach {
+
   case n: UnresolvedFieldReference if n.name == "*" =>
 projectList ++= parent.output.map(UnresolvedAlias(_))
+
+  // flattening can only applied on field references
+  case Flattening(composite) if
--- End diff --

Add a case for `Flattening(_)` that catches Flattenings on non-field 
expressions and throw a `ValidationException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88682935
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 ---
@@ -324,10 +335,19 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
   }
 
+  lazy val prefixGet: PackratParser[Expression] =
+GET ~ "(" ~ composite ~ ","  ~ literalExpr ~ ")" ^^ {
+  case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+GetCompositeField(e, index.asInstanceOf[Literal].value)
+  }
+
+  lazy val prefixFlattening: PackratParser[Expression] =
+GET ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
--- End diff --

`GET` -> `FLATTEN`?
Please add a test case for flatten in prefix notation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88682137
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
 ---
@@ -79,14 +83,27 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   }
 
   private def createAdvancedType(typeInfo: TypeInformation[_]): 
RelDataType = typeInfo match {
-// TODO add specific RelDataTypes
-// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
+case ct: CompositeType[_] =>
+  new CompositeRelDataType(ct, this)
+
+// TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, 
ObjectArrayTypeInfo
 case ti: TypeInformation[_] =>
   new GenericRelDataType(typeInfo, 
getTypeSystem.asInstanceOf[FlinkTypeSystem])
 
 case ti@_ =>
   throw TableException(s"Unsupported type information: $ti")
   }
+
+  override def createTypeWithNullability(
+  relDataType: RelDataType,
+  nullable: Boolean)
+: RelDataType = relDataType match {
+case composite: CompositeRelDataType =>
+  // at the moment we do not care about nullability
--- End diff --

I see, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r88699648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure, ValidationSuccess}
+
+
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ExprValidationResult =
+ValidationFailure(s"Unresolved flattening of $child")
--- End diff --

Add a comment that `Flattening` is converted into `GetCompositeFields` 
before validation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1914) Wrong FS while starting YARN session without correct HADOOP_HOME

2016-11-18 Thread Malte Schwarzer (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677519#comment-15677519
 ] 

Malte Schwarzer commented on FLINK-1914:


I'm having the same issue with Flink 1.1.3 and Hadoop 2.7.3, when I try to run 
a Flink job on YARN. HADOOP_HOME is not set.

{code}flink/bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096 
flink/examples/batch/WordCount.jar hdfs://power1:55000/info.txt
{code}

{code}
016-11-18 20:06:48,987 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner  
   - Setting up resources for TaskManagers
2016-11-18 20:06:49,989 ERROR org.apache.flink.yarn.YarnApplicationMasterRunner 
- YARN Application Master initialization failed
java.lang.IllegalArgumentException: Wrong FS: 
file:/home/hadoop/.flink/application_1479495922304_0001/flink-dist_2.11-1.1.3.jar,
 expected: hdfs://ibm-power-1.dima.tu-berlin.de:55000
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:191)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:102)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:135)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.createTaskManagerContext(YarnApplicationMasterRunner.java:543)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:261)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner$1.run(YarnApplicationMasterRunner.java:153)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner$1.run(YarnApplicationMasterRunner.java:150)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1536)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:150)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:112)
{code}

> Wrong FS while starting YARN session without correct HADOOP_HOME
> 
>
> Key: FLINK-1914
> URL: https://issues.apache.org/jira/browse/FLINK-1914
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Zoltán Zvara
>Priority: Trivial
>  Labels: yarn, yarn-client
>
> When YARN session invoked ({{yarn-session.sh}}) without a correct 
> {{HADOOP_HOME}} (AM still deployed to - for example to {{0.0.0.0:8032}}), but 
> the deployed AM fails with an {{IllegalArgumentException}}:
> {code}
> java.lang.IllegalArgumentException: Wrong FS: 
> file:/home/.../flink-dist-0.9-SNAPSHOT.jar, expected: hdfs://localhost:9000
>   at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:181)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
>   at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:105)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:436)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:371)
>   at scala.util.Try$.apply(Try.scala:161)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$class.org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession(ApplicationMasterActor.scala:371)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:155)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.

[jira] [Assigned] (FLINK-5101) Test CassandraConnectorITCase instable

2016-11-18 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-5101:
---

Assignee: Chesnay Schepler

> Test CassandraConnectorITCase instable
> --
>
> Key: FLINK-5101
> URL: https://issues.apache.org/jira/browse/FLINK-5101
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Reporter: Stefan Richter
>Assignee: Chesnay Schepler
>
> I observed this test fail on Travis (very rarely):
>  
>  Running 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase
> Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 80.843 sec 
> <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase
> testCassandraBatchFormats(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase)
>   Time elapsed: 5.82 sec  <<< FAILURE!
> java.lang.AssertionError: expected:<40> but was:<20>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testCassandraBatchFormats(CassandraConnectorITCase.java:442)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4936) Operator names for Gelly inputs

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677434#comment-15677434
 ] 

ASF GitHub Bot commented on FLINK-4936:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2832

[FLINK-4936] [gelly] Operator names for Gelly inputs

Provide descriptive operator names for Graph and GraphCsvReader.
Condense multiple type conversion maps into a single mapper.
Reuse objects in operations wrapping user-defined-functions.

Travis CI tests are currently broken due to an SSL certificate error at 
https://repo.maven.apache.org/maven2

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4936_operator_names_for_gelly_inputs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2832.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2832


commit 3971d1414d4c4dcf372a7e117d16956aa8a9cce2
Author: Greg Hogan 
Date:   2016-10-26T19:18:50Z

[FLINK-4936] [gelly] Operator names for Gelly inputs

Provide descriptive operator names for Graph and GraphCsvReader.
Condense multiple type conversion maps into a single mapper.
Reuse objects in operations wrapping user-defined-functions.




> Operator names for Gelly inputs
> ---
>
> Key: FLINK-4936
> URL: https://issues.apache.org/jira/browse/FLINK-4936
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Provider descriptive operator names for Gelly's {{Graph}} and 
> {{GraphCsvReader}}. Also, condense multiple type conversion maps into a 
> single mapper.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2832: [FLINK-4936] [gelly] Operator names for Gelly inpu...

2016-11-18 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2832

[FLINK-4936] [gelly] Operator names for Gelly inputs

Provide descriptive operator names for Graph and GraphCsvReader.
Condense multiple type conversion maps into a single mapper.
Reuse objects in operations wrapping user-defined-functions.

Travis CI tests are currently broken due to an SSL certificate error at 
https://repo.maven.apache.org/maven2

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4936_operator_names_for_gelly_inputs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2832.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2832


commit 3971d1414d4c4dcf372a7e117d16956aa8a9cce2
Author: Greg Hogan 
Date:   2016-10-26T19:18:50Z

[FLINK-4936] [gelly] Operator names for Gelly inputs

Provide descriptive operator names for Graph and GraphCsvReader.
Condense multiple type conversion maps into a single mapper.
Reuse objects in operations wrapping user-defined-functions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5002) Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers

2016-11-18 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677332#comment-15677332
 ] 

Stephan Ewen commented on FLINK-5002:
-

The link is referring to opening a pull request. Can you post the proper commit 
link?

> Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers
> -
>
> Key: FLINK-5002
> URL: https://issues.apache.org/jira/browse/FLINK-5002
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Roman Maier
>Priority: Minor
>  Labels: easyfix, starter
>
> {code}
>   public int getNumberOfUsedBuffers() {
> return numberOfRequestedMemorySegments - availableMemorySegments.size();
>   }
> {code}
> Access to availableMemorySegments should be protected with proper 
> synchronization as other methods do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5101) Test CassandraConnectorITCase instable

2016-11-18 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5101:
-

 Summary: Test CassandraConnectorITCase instable
 Key: FLINK-5101
 URL: https://issues.apache.org/jira/browse/FLINK-5101
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Reporter: Stefan Richter


I observed this test fail on Travis (very rarely):
 
 Running 
org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase


Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 80.843 sec <<< 
FAILURE! - in 
org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase
testCassandraBatchFormats(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase)
  Time elapsed: 5.82 sec  <<< FAILURE!
java.lang.AssertionError: expected:<40> but was:<20>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testCassandraBatchFormats(CassandraConnectorITCase.java:442)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5100) Test testZooKeeperReelection is instable

2016-11-18 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5100:
-

 Summary: Test testZooKeeperReelection is instable
 Key: FLINK-5100
 URL: https://issues.apache.org/jira/browse/FLINK-5100
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Reporter: Stefan Richter


I observed this test failing (very rarely) on Travis:
 
testZooKeeperReelection(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest)
  Time elapsed: 303.321 sec  <<< FAILURE!
java.lang.AssertionError: null
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertFalse(Assert.java:64)
at org.junit.Assert.assertFalse(Assert.java:74)
at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelection(ZooKeeperLeaderElectionTest.java:197)


Results :

Failed tests: 
  ZooKeeperLeaderElectionTest.testZooKeeperReelection:197 null



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5099) Test testCancelPartitionRequest is instable

2016-11-18 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5099:
-

 Summary: Test testCancelPartitionRequest is instable
 Key: FLINK-5099
 URL: https://issues.apache.org/jira/browse/FLINK-5099
 Project: Flink
  Issue Type: Bug
  Components: Network
Reporter: Stefan Richter


I observed this test fail on Travis (very rarely):

testCancelPartitionRequest(org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest)
  Time elapsed: 168.756 sec  <<< ERROR!
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2367)
at 
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
at 
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
at 
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
at java.lang.StringBuilder.append(StringBuilder.java:132)
at 
org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.testCancelPartitionRequest(CancelPartitionRequestTest.java:94)

Results :

Tests in error: 
  CancelPartitionRequestTest.testCancelPartitionRequest:94 » OutOfMemory Java 
he...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3703) Add sequence matching semantics to discard matched events

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677048#comment-15677048
 ] 

ASF GitHub Bot commented on FLINK-3703:
---

Github user LordFB commented on the issue:

https://github.com/apache/flink/pull/2367
  
Hi @mushketyk,

too bad, with this missing it is kind of a gamebreaker for Flink in my Use 
Case.

Jeah, that'd be great, if Till would show some action on this and the 
related PRs.


> Add sequence matching semantics to discard matched events
> -
>
> Key: FLINK-3703
> URL: https://issues.apache.org/jira/browse/FLINK-3703
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> There is no easy way to decide whether events can be part of multiple 
> matching sequences or not. Currently, the default is that an event can 
> participate in multiple matching sequences. E.g. if you have the pattern 
> {{Pattern.begin("a").followedBy("b")}} and the input event stream 
> {{Event("A"), Event("B"), Event("C")}}, then you will generate the following 
> matching sequences: {{Event("A"), Event("B")}}, {{Event("A"), Event("C")}} 
> and {{Event("B"), Event("C")}}. 
> It would be useful to allow the user to define where the matching algorithm 
> should continue after a matching sequence has been found. Possible option 
> values could be 
>  * {{from first}} - continue keeping all events for future matches (that is 
> the current behaviour) 
>  * {{after first}} -  continue after the first element (remove first matching 
> event and continue with the second event)
>  * {{after last}} - continue after the last element (effectively discarding 
> all elements of the matching sequence)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2367: [FLINK-3703][cep] Add sequence matching semantics to disc...

2016-11-18 Thread LordFB
Github user LordFB commented on the issue:

https://github.com/apache/flink/pull/2367
  
Hi @mushketyk,

too bad, with this missing it is kind of a gamebreaker for Flink in my Use 
Case.

Jeah, that'd be great, if Till would show some action on this and the 
related PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-11-18 Thread Philipp von dem Bussche (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677009#comment-15677009
 ] 

Philipp von dem Bussche commented on FLINK-2821:


[~StephanEwen] I am not quite sure if this is going to work. So the IP of the 
orchestration framework I am using (Rancher) is exposing a 10.x IP address 
which is not available on the host itself (only the 172.x address from Docker). 
So what I have seen with binding previously was that when the host is binding 
to 172.x it would reject a request against a 10.x address. So if we think that 
it won't do that when binding on 0.0.0.0 then I am cool with the change :) 
If this is too theoretical though I am more than happy to do more testing if 
[~mxm] wants to do the change.

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4679) Add TumbleRow row-windows for streaming tables

2016-11-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676958#comment-15676958
 ] 

Fabian Hueske commented on FLINK-4679:
--

Hi [~jark], I think this approach could work for row windows which are 
count-based. Event-time count-windows would in addition require to sort the 
stream on event time before the window operation is applied.

For event-time time-window row-windows, we would need something more complex. I 
was thinking of a custom operator that collects records in a priority queue 
ordered by timestamp. Once a watermark is received for the upper bound of a 
window (can be define with preceding and following time, but initially, we 
should start with preceding only, IMO), the priority queue is used to evaluate 
the window function and to purge too old records.
An event-time count-window row-window could use the same infrastructure (the 
prio-queue would take care of the sorting) and use different logic to evaluate 
windows and drop records (based on count rather than time).

For processing-time window, we could use a simple queue and evaluate every time 
a new record is added. A processing count-window could be implemented similar. 

So it might make sense to have everything implemented as custom operators, as 
is seems that we could reuse some code parts.
What do you think [~jark], [~twalthr]?

> Add TumbleRow row-windows for streaming tables
> --
>
> Key: FLINK-4679
> URL: https://issues.apache.org/jira/browse/FLINK-4679
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> Add TumbleRow row-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> This task requires to implement a custom stream operator and integrate it 
> with checkpointing and timestamp / watermark logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2390) Replace iteration timeout with algorithm for detecting termination

2016-11-18 Thread Paris Carbone (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paris Carbone updated FLINK-2390:
-
Assignee: (was: Paris Carbone)

> Replace iteration timeout with algorithm for detecting termination
> --
>
> Key: FLINK-2390
> URL: https://issues.apache.org/jira/browse/FLINK-2390
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gyula Fora
> Fix For: 1.0.0
>
>
> Currently the user can set a timeout which will shut down the iteration 
> source/sink nodes if no new data is received during that time to allow 
> program termination in iterative streaming jobs.
> This method is used due to the non-trivial nature of termination in iterative 
> streaming jobs. While termination is not a main concern in long running 
> streaming jobs, this behaviour makes iterative tests non-deterministic and 
> they often fail on travis due to the timeout. Also setting a timeout can 
> cause jobs to terminate prematurely.
> I propose to remove iteration timeouts and replace it with the following 
> algorithm for detecting termination:
> -We first identify loop edges in the jobgraph (the channels from the 
> iteration sources to the head operators)
> -Once the head operators (the ones with loop input) finish with all their 
> non-loop inputs they broadcast a marker to their outputs.
> -Each operator will broadcast a marker once it received a marker from all its 
> non-finished inputs
> -Iteration sources are terminated when they receive 2 consecutive markers 
> without receiving any record in-between
> The idea behind the algorithm is to find out when no more outputs are 
> generated from the operators inside an iteration after their normal inputs 
> are finished.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2390) Replace iteration timeout with algorithm for detecting termination

2016-11-18 Thread Paris Carbone (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paris Carbone reassigned FLINK-2390:


Assignee: Paris Carbone

> Replace iteration timeout with algorithm for detecting termination
> --
>
> Key: FLINK-2390
> URL: https://issues.apache.org/jira/browse/FLINK-2390
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gyula Fora
>Assignee: Paris Carbone
> Fix For: 1.0.0
>
>
> Currently the user can set a timeout which will shut down the iteration 
> source/sink nodes if no new data is received during that time to allow 
> program termination in iterative streaming jobs.
> This method is used due to the non-trivial nature of termination in iterative 
> streaming jobs. While termination is not a main concern in long running 
> streaming jobs, this behaviour makes iterative tests non-deterministic and 
> they often fail on travis due to the timeout. Also setting a timeout can 
> cause jobs to terminate prematurely.
> I propose to remove iteration timeouts and replace it with the following 
> algorithm for detecting termination:
> -We first identify loop edges in the jobgraph (the channels from the 
> iteration sources to the head operators)
> -Once the head operators (the ones with loop input) finish with all their 
> non-loop inputs they broadcast a marker to their outputs.
> -Each operator will broadcast a marker once it received a marker from all its 
> non-finished inputs
> -Iteration sources are terminated when they receive 2 consecutive markers 
> without receiving any record in-between
> The idea behind the algorithm is to find out when no more outputs are 
> generated from the operators inside an iteration after their normal inputs 
> are finished.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676845#comment-15676845
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88638112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
--- End diff --

add space before `extends`


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676846#comment-15676846
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88657597
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
--- End diff --

Can we add a separate method to create the preparing `MapFunction` to 
`AggregateUtil`?
This is code that is shared for all aggregations (batch, streaming), 
(incremental, non-incremental), etc. 

Would be nice to have that extracted and the mapper applied outside of this 
large condition. Would be great if you could refactor the DataSetAggregate code 
on the way as well.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676839#comment-15676839
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88659308
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
--- End diff --

This method could be split into three methods, one for the mapper, one for 
the reduce function, and one for the window function.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676838#comment-15676838
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88641722
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
--- End diff --

Can you add a comment that this can be moved to `RichFunction.open()` once 
[FLINK-5094](https://issues.apache.org/jira/browse/FLINK-5094) is resolved?


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676844#comment-15676844
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88636193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
 ---
@@ -52,6 +52,6 @@ class AggregateTimeWindowFunction(
 collector.timeWindow = window
 
 // call wrapped reduce function with property collector
-groupReduceFunction.reduce(input, collector)
+super.apply(key,window,input,collector)
--- End diff --

Please add spaces between function arguements


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676842#comment-15676842
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88659144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
+  namedAggregates,
+  inputType,
+  getRowType,
+  grouping,
+  aggregates,
+  aggFieldIndexes)
+
+val mappedInput = inputDS
+  .map(mapFunction)
+  .name(prepareOpName)
+
+// grouped / keyed aggregation
+if (groupingKeys.length > 0) {
+
+  val winFunction =
+createWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+  val windowedStream = createKeyedWindowedStream(window, 
keyedStream)
+.asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(keyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
+// global / non-keyed aggregation
+else {
+  val winFunction =
+createAllWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val windowedStream = createNonKeyedWindowedStream(window, 
mappedInput)
+.asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(nonKeyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
   }
-  // global / non-keyed aggregation
   else {
-val aggOpName = s"window: ($window), select: ($aggString)"
-val aggregateFunction =
-  createAllWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val windowedS

[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676848#comment-15676848
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88653721
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

I think we can make this a bit more lightweight.
Instead of initializing `accumulatorRow` we could copy all fields of 
`value1`:
```
(0 until intermediateRowArity)
  .foreach(i => accumulatorRow.setField(i, value1.productElement(i)))
```

Then we only need to merge `value2` into `accumulatorRow` and do not need 
to copy the groupKeys (`groupKeysMapping` becomes obsolete).


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676847#comment-15676847
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88658743
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -231,6 +297,64 @@ object DataStreamAggregate {
 
   }
 
+  private def createAllWindowIncrementalAggregationFunction(
--- End diff --

I think we can move all `create*Window*Function` methods (also the ones 
that have been here before) to `AggregateUtil`. 


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676840#comment-15676840
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88633784
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 ---
@@ -30,9 +30,9 @@ class AggregateMapFunction[IN, OUT](
 private val groupingKeys: Array[Int],
 @transient private val returnType: TypeInformation[OUT])
 extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
-  
--- End diff --

Please undo the changes on this file.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676843#comment-15676843
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88635921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
 ---
@@ -48,6 +48,6 @@ class AggregateAllTimeWindowFunction(
 collector.timeWindow = window
 
 // call wrapped reduce function with property collector
-groupReduceFunction.reduce(input, collector)
+super.apply(window,input,collector)
--- End diff --

please add space between function arguments.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676841#comment-15676841
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88637570
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * It Evaluate final aggregate value.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  */
+class IncrementalAggregateTimeWindowFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val finalRowArity: Int,
+private val windowStartPos: Option[Int],
+private val windowEndPos: Option[Int])
+  extends IncrementalAggregateWindowFunction[TimeWindow](
+aggregates,
+groupKeysMapping,
+aggregateMapping, finalRowArity) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+collector = new TimeWindowPropertyCollector(windowStartPos, 
windowEndPos)
+super.open(parameters)
+  }
+
+  override def apply(
+key: Tuple,
+window: TimeWindow,
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+// set collector and window
+collector.wrappedCollector = out
+collector.timeWindow = window
+
+super.apply(key,window,records,collector)
--- End diff --

please add spaces


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88641722
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
--- End diff --

Can you add a comment that this can be moved to `RichFunction.open()` once 
[FLINK-5094](https://issues.apache.org/jira/browse/FLINK-5094) is resolved?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88657597
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
--- End diff --

Can we add a separate method to create the preparing `MapFunction` to 
`AggregateUtil`?
This is code that is shared for all aggregations (batch, streaming), 
(incremental, non-incremental), etc. 

Would be nice to have that extracted and the mapper applied outside of this 
large condition. Would be great if you could refactor the DataSetAggregate code 
on the way as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88638112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
--- End diff --

add space before `extends`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88659144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
+  namedAggregates,
+  inputType,
+  getRowType,
+  grouping,
+  aggregates,
+  aggFieldIndexes)
+
+val mappedInput = inputDS
+  .map(mapFunction)
+  .name(prepareOpName)
+
+// grouped / keyed aggregation
+if (groupingKeys.length > 0) {
+
+  val winFunction =
+createWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+  val windowedStream = createKeyedWindowedStream(window, 
keyedStream)
+.asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(keyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
+// global / non-keyed aggregation
+else {
+  val winFunction =
+createAllWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val windowedStream = createNonKeyedWindowedStream(window, 
mappedInput)
+.asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(nonKeyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
   }
-  // global / non-keyed aggregation
   else {
-val aggOpName = s"window: ($window), select: ($aggString)"
-val aggregateFunction =
-  createAllWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val windowedStream = createNonKeyedWindowedStream(window, 
mappedInput)
-  .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
- 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88633784
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 ---
@@ -30,9 +30,9 @@ class AggregateMapFunction[IN, OUT](
 private val groupingKeys: Array[Int],
 @transient private val returnType: TypeInformation[OUT])
 extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
-  
--- End diff --

Please undo the changes on this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88635921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
 ---
@@ -48,6 +48,6 @@ class AggregateAllTimeWindowFunction(
 collector.timeWindow = window
 
 // call wrapped reduce function with property collector
-groupReduceFunction.reduce(input, collector)
+super.apply(window,input,collector)
--- End diff --

please add space between function arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88653721
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

I think we can make this a bit more lightweight.
Instead of initializing `accumulatorRow` we could copy all fields of 
`value1`:
```
(0 until intermediateRowArity)
  .foreach(i => accumulatorRow.setField(i, value1.productElement(i)))
```

Then we only need to merge `value2` into `accumulatorRow` and do not need 
to copy the groupKeys (`groupKeysMapping` becomes obsolete).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88658743
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -231,6 +297,64 @@ object DataStreamAggregate {
 
   }
 
+  private def createAllWindowIncrementalAggregationFunction(
--- End diff --

I think we can move all `create*Window*Function` methods (also the ones 
that have been here before) to `AggregateUtil`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88637570
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * It Evaluate final aggregate value.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  */
+class IncrementalAggregateTimeWindowFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val finalRowArity: Int,
+private val windowStartPos: Option[Int],
+private val windowEndPos: Option[Int])
+  extends IncrementalAggregateWindowFunction[TimeWindow](
+aggregates,
+groupKeysMapping,
+aggregateMapping, finalRowArity) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+collector = new TimeWindowPropertyCollector(windowStartPos, 
windowEndPos)
+super.open(parameters)
+  }
+
+  override def apply(
+key: Tuple,
+window: TimeWindow,
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+// set collector and window
+collector.wrappedCollector = out
+collector.timeWindow = window
+
+super.apply(key,window,records,collector)
--- End diff --

please add spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88659308
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
--- End diff --

This method could be split into three methods, one for the mapper, one for 
the reduce function, and one for the window function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88636193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
 ---
@@ -52,6 +52,6 @@ class AggregateTimeWindowFunction(
 collector.timeWindow = window
 
 // call wrapped reduce function with property collector
-groupReduceFunction.reduce(input, collector)
+super.apply(key,window,input,collector)
--- End diff --

Please add spaces between function arguements


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5010) Decouple the death watch parameters from the Akka ask timeout

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676830#comment-15676830
 ] 

ASF GitHub Bot commented on FLINK-5010:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2831

[FLINK-5010] [akka] Introduce default configuration values for Akka's 
deathwatch

Set the akka deathwatch interval to 10s, the akka deathwatch pause to 60s 
and the tcp
connection timeout to 20s per default.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
decoupleAskTimeoutFromDeathwatch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2831.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2831


commit a27a484ff5e6542f6e4050cb6748964ba5d12f65
Author: Till Rohrmann 
Date:   2016-11-18T14:24:51Z

[FLINK-5010] [akka] Introduce default configuration values for Akka's 
deathwatch

Set the akka deathwatch interval to 10s, the akka deathwatch pause to 60s 
and the tcp
connection timeout to 20s per default.




> Decouple the death watch parameters from the Akka ask timeout
> -
>
> Key: FLINK-5010
> URL: https://issues.apache.org/jira/browse/FLINK-5010
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.1.4
>
>
> The Akka ask timeout should not influence the death watch interval or the 
> timeout. Thus, I propose to introduce default values for these configuration 
> options.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2831: [FLINK-5010] [akka] Introduce default configuratio...

2016-11-18 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2831

[FLINK-5010] [akka] Introduce default configuration values for Akka's 
deathwatch

Set the akka deathwatch interval to 10s, the akka deathwatch pause to 60s 
and the tcp
connection timeout to 20s per default.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
decoupleAskTimeoutFromDeathwatch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2831.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2831


commit a27a484ff5e6542f6e4050cb6748964ba5d12f65
Author: Till Rohrmann 
Date:   2016-11-18T14:24:51Z

[FLINK-5010] [akka] Introduce default configuration values for Akka's 
deathwatch

Set the akka deathwatch interval to 10s, the akka deathwatch pause to 60s 
and the tcp
connection timeout to 20s per default.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5002) Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers

2016-11-18 Thread Roman Maier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676805#comment-15676805
 ] 

Roman Maier commented on FLINK-5002:


Please check the implementation of this issue:
https://github.com/apache/flink/compare/master...MayerRoman:FLINK-5002?expand=1


> Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers
> -
>
> Key: FLINK-5002
> URL: https://issues.apache.org/jira/browse/FLINK-5002
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Roman Maier
>Priority: Minor
>  Labels: easyfix, starter
>
> {code}
>   public int getNumberOfUsedBuffers() {
> return numberOfRequestedMemorySegments - availableMemorySegments.size();
>   }
> {code}
> Access to availableMemorySegments should be protected with proper 
> synchronization as other methods do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4591) Select star does not work with grouping

2016-11-18 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-4591.
--
Resolution: Won't Fix

> Select star does not work with grouping
> ---
>
> Key: FLINK-4591
> URL: https://issues.apache.org/jira/browse/FLINK-4591
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> It would be consistent if this would also work:
> {{table.groupBy( '* ).select( "* )}}
> Currently, the star only works in a plain select without grouping.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5094) Support RichReduceFunction and RichFoldFunction as incremental window aggregation functions

2016-11-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676772#comment-15676772
 ] 

Fabian Hueske commented on FLINK-5094:
--

Yes, the incremental aggregation functions are part of the state objects. I'm 
not familiar with the details, just notice that {{RichFunction.open()}} is not 
called. [~aljoscha] knows this part better than I do.

> Support RichReduceFunction and RichFoldFunction as incremental window 
> aggregation functions
> ---
>
> Key: FLINK-5094
> URL: https://issues.apache.org/jira/browse/FLINK-5094
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Windowing Operators
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>
> Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental window 
> aggregation functions in order to initialize the functions via {{open()}}.
> The main problem is that we do not want to provide the full power of 
> {{RichFunction}} for incremental aggregation functions, such as defining own 
> operator state. This could be achieve by providing some kind of 
> {{RestrictedRuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5094) Support RichReduceFunction and RichFoldFunction as incremental window aggregation functions

2016-11-18 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676759#comment-15676759
 ] 

Jark Wu commented on FLINK-5094:


Hi [~fhueske], this may need to modify the implementation of {{FoldingState}} 
{{ReducingState}}, i.e. {{HeapReducingState}} {{RocksDBReducingState}}, right?

> Support RichReduceFunction and RichFoldFunction as incremental window 
> aggregation functions
> ---
>
> Key: FLINK-5094
> URL: https://issues.apache.org/jira/browse/FLINK-5094
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Windowing Operators
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>
> Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental window 
> aggregation functions in order to initialize the functions via {{open()}}.
> The main problem is that we do not want to provide the full power of 
> {{RichFunction}} for incremental aggregation functions, such as defining own 
> operator state. This could be achieve by providing some kind of 
> {{RestrictedRuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2830: [FLINK-5098] [akka] Detect unreachable remote acto...

2016-11-18 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2830

[FLINK-5098] [akka] Detect unreachable remote actors to fail ask calls 
eagerly

This PR adds to every sent ask message an additional Identify message which 
is used to
detect whether the target actor is actually reachable. The Identify message 
allows to
detect unreachable ActorSystems or that an actor is not existing in an 
ActorSystem without
having to wait for a timeout. This then allows to fail the ask operation 
eagerly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink eagerTimeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2830.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2830


commit 0e4f059ca2a1764082851c8e7538483180c3e40b
Author: Till Rohrmann 
Date:   2016-11-09T14:29:32Z

[FLINK-5098] [akka] Detect unreachable remote actors to fail ask calls 
eagerly

This PR adds to every sent ask message an additional Identify message which 
is used to
detect whether the target actor is actually reachable. The Identify message 
allows to
detect unreachable ActorSystems or that an actor is not existing in an 
ActorSystem without
having to wait for a timeout. This then allows to fail the ask operation 
eagerly.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5098) Detect network problems to eagerly time out ask operations

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676758#comment-15676758
 ] 

ASF GitHub Bot commented on FLINK-5098:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2830

[FLINK-5098] [akka] Detect unreachable remote actors to fail ask calls 
eagerly

This PR adds to every sent ask message an additional Identify message which 
is used to
detect whether the target actor is actually reachable. The Identify message 
allows to
detect unreachable ActorSystems or that an actor is not existing in an 
ActorSystem without
having to wait for a timeout. This then allows to fail the ask operation 
eagerly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink eagerTimeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2830.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2830


commit 0e4f059ca2a1764082851c8e7538483180c3e40b
Author: Till Rohrmann 
Date:   2016-11-09T14:29:32Z

[FLINK-5098] [akka] Detect unreachable remote actors to fail ask calls 
eagerly

This PR adds to every sent ask message an additional Identify message which 
is used to
detect whether the target actor is actually reachable. The Identify message 
allows to
detect unreachable ActorSystems or that an actor is not existing in an 
ActorSystem without
having to wait for a timeout. This then allows to fail the ask operation 
eagerly.




> Detect network problems to eagerly time out ask operations
> --
>
> Key: FLINK-5098
> URL: https://issues.apache.org/jira/browse/FLINK-5098
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Akka's ask operations are given a timeout after which they should fail with 
> an {{AskTimeoutException}}. In some cases, however, it is possible to fail 
> early because one has detected that the remote host is not reachable or that 
> the actor does not exist on the remote {{ActorSystem}}.
> Usually failing early if one cannot hope for a successful message delivery is 
> a desirable behaviour since it speeds up recovery. 
> I propose to send Akka's {{Identify}} message with each ask request. The 
> identify message allows to detect unreachable/non-existing actors and, thus, 
> enables us to fail the ask operation early.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5098) Detect network problems to eagerly time out ask operations

2016-11-18 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5098:


 Summary: Detect network problems to eagerly time out ask operations
 Key: FLINK-5098
 URL: https://issues.apache.org/jira/browse/FLINK-5098
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.2.0


Akka's ask operations are given a timeout after which they should fail with an 
{{AskTimeoutException}}. In some cases, however, it is possible to fail early 
because one has detected that the remote host is not reachable or that the 
actor does not exist on the remote {{ActorSystem}}.

Usually failing early if one cannot hope for a successful message delivery is a 
desirable behaviour since it speeds up recovery. 

I propose to send Akka's {{Identify}} message with each ask request. The 
identify message allows to detect unreachable/non-existing actors and, thus, 
enables us to fail the ask operation early.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3617) NPE from CaseClassSerializer when dealing with null Option field

2016-11-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676714#comment-15676714
 ] 

Fabian Hueske commented on FLINK-3617:
--

Don't know. [~jgrier] who reported the issue might comment on this.
IMO, it's not urgent but a decision is required to resolve the PR that was 
contributed.

A solution to upgrade serializers would be great!

> NPE from CaseClassSerializer when dealing with null Option field
> 
>
> Key: FLINK-3617
> URL: https://issues.apache.org/jira/browse/FLINK-3617
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Jamie Grier
>
> This error occurs when serializing a Scala case class with an field of 
> Option[] type where the value is not Some or None, but null.
> If this is not supported we should have a good error message.
> java.lang.RuntimeException: ConsumerThread threw an exception: null
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:107)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4679) Add TumbleRow row-windows for streaming tables

2016-11-18 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676708#comment-15676708
 ] 

Jark Wu commented on FLINK-4679:


Hi [~fhueske] [~twalthr], if I understand correctly, the row-window will 
evaluate the aggregates every time a row comes in the window. I think it is 
really like window early-fire which is controlled by Trigger. Could we 
implement some specific Trigger to fire on every element and then no custom 
stream operator needed ?  Have I missed anything?

The Row-count row-window trigger could be like this : 

{code}
class RowWindowCountTrigger[W <: Window](maxCount: Long) extends Trigger[Any, 
W] {

  val stateDesc = new ReducingStateDescriptor[JLong]("count", Sum, 
LongSerializer.INSTANCE)

  override def onElement(element: Any, timestamp: Long, window: W, ctx: 
TriggerContext)
  : TriggerResult = {

val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc)
count.add(1L)
if (count.get >= maxCount) {
  count.clear()
  TriggerResult.FIRE_AND_PURGE
} else {
  TriggerResult.FIRE
}

  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): 
TriggerResult =
TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: W, ctx: TriggerContext): 
TriggerResult =
TriggerResult.CONTINUE

  override def clear(window: W, ctx: TriggerContext): Unit =
ctx.getPartitionedState(stateDesc).clear()
  

  @SerialVersionUID(1L)
  object Sum extends ReduceFunction[JLong] {
@throws[Exception]
def reduce(value1: JLong, value2: JLong): JLong = value1 + value2
  }
}
{code}

> Add TumbleRow row-windows for streaming tables
> --
>
> Key: FLINK-4679
> URL: https://issues.apache.org/jira/browse/FLINK-4679
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> Add TumbleRow row-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> This task requires to implement a custom stream operator and integrate it 
> with checkpointing and timestamp / watermark logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5097) The TypeExtractor is missing input type information in some Graph methods

2016-11-18 Thread Vasia Kalavri (JIRA)
Vasia Kalavri created FLINK-5097:


 Summary: The TypeExtractor is missing input type information in 
some Graph methods
 Key: FLINK-5097
 URL: https://issues.apache.org/jira/browse/FLINK-5097
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Vasia Kalavri
Assignee: Vasia Kalavri


The TypeExtractor is called without information about the input type in 
{{mapVertices}}, {{mapVEdges}}, and {{fromDataSet}}, although this information 
can be easily retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3617) NPE from CaseClassSerializer when dealing with null Option field

2016-11-18 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676696#comment-15676696
 ] 

Stephan Ewen commented on FLINK-3617:
-

How pressing is that? Is it a serious problem to work around?
Having an upgrade story for serializers (I know some people are starting to 
design that) would let us do this more smoothly in the future.

> NPE from CaseClassSerializer when dealing with null Option field
> 
>
> Key: FLINK-3617
> URL: https://issues.apache.org/jira/browse/FLINK-3617
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Jamie Grier
>
> This error occurs when serializing a Scala case class with an field of 
> Option[] type where the value is not Some or None, but null.
> If this is not supported we should have a good error message.
> java.lang.RuntimeException: ConsumerThread threw an exception: null
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:107)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths

2016-11-18 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676692#comment-15676692
 ] 

Stephan Ewen commented on FLINK-5090:
-

I have added min/max/avg across the channels for now. Having all channels 
creates a flood of metrics.
Do you think min/max/avg is okay, or are all channels needed?

> Expose optionally detailed metrics about network queue lengths
> --
>
> Key: FLINK-5090
> URL: https://issues.apache.org/jira/browse/FLINK-5090
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> For debugging purposes, it is important to have access to more detailed 
> metrics about the length of network input and output queues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-11-18 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676689#comment-15676689
 ] 

Stephan Ewen commented on FLINK-2821:
-

Concerning HA setups: Publishing the JobManager host name is necessary for 
discovery (ZooKeeper becomes the naming service effectively).
In Kubernetes and similar setups, there is already dynamic naming, so it is 
redundant, but should not harm.

The only thing to think through is that we internally always replace host names 
by resolved IP addresses. That is important because akka makes an "exact string 
match" on the actor URLs, so we have to have a normalized format. We decided to 
go with IP addresses in the URL, because name resolution works better than 
reverse name lookups.

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-11-18 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676678#comment-15676678
 ] 

Stephan Ewen commented on FLINK-2821:
-

+1 for getting rid of {{jobmanager.rpc.bind-address}} and 
{{jobmanager.rpc.bind-por}} and binding always to {{0.0.0.0}} and having the 
same port and bind-port.

[~melentye] and [~philipp.bussche] do you see any issues with that?

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5096) Make the RollingSink rescalable.

2016-11-18 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-5096:
-

 Summary: Make the RollingSink rescalable.
 Key: FLINK-5096
 URL: https://issues.apache.org/jira/browse/FLINK-5096
 Project: Flink
  Issue Type: Improvement
  Components: filesystem-connector
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.2.0


Integrate the RollingSink with the new state abstractions so that its 
parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface

2016-11-18 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-5095:
---

 Summary: Add explicit notifyOfAddedX methods to MetricReporter 
interface
 Key: FLINK-5095
 URL: https://issues.apache.org/jira/browse/FLINK-5095
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.1.3
Reporter: Chesnay Schepler


I would like to start a discussion on the MetricReporter interface, 
specifically the methods that notify a reporter of added or removed metrics.

Currently, the methods are defined as follows:
{code}
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
{code}

All metrics, regardless of their actual type, are passed to the reporter with 
these methods.

Since the different metric types have to be handled differently we thus force 
every reporter to do something like this:
{code}
if (metric instanceof Counter) {
Counter c = (Counter) metric;
// deal with counter
} else if (metric instanceof Gauge) {
// deal with gauge
} else if (metric instanceof Histogram) {
// deal with histogram
} else if (metric instanceof Meter) {
// deal with meter
} else {
// log something or throw an exception
}
{code}

This has a few issues
* the instanceof checks and castings are unnecessary overhead
* it requires the implementer to be aware of every metric type
* it encourages throwing an exception in the final else block

We could remedy all of these by reworking the interface to contain explicit 
add/remove methods for every metric type. This would however be a breaking 
change and blow up the interface to 12 methods from the current 4. We could 
also add a RichMetricReporter interface with these methods, which would require 
relatively little changes but add additional complexity.

I was wondering what other people think about this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface

2016-11-18 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5095:

Priority: Minor  (was: Major)

> Add explicit notifyOfAddedX methods to MetricReporter interface
> ---
>
> Key: FLINK-5095
> URL: https://issues.apache.org/jira/browse/FLINK-5095
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Priority: Minor
>
> I would like to start a discussion on the MetricReporter interface, 
> specifically the methods that notify a reporter of added or removed metrics.
> Currently, the methods are defined as follows:
> {code}
> void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
> void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup 
> group);
> {code}
> All metrics, regardless of their actual type, are passed to the reporter with 
> these methods.
> Since the different metric types have to be handled differently we thus force 
> every reporter to do something like this:
> {code}
> if (metric instanceof Counter) {
> Counter c = (Counter) metric;
>   // deal with counter
> } else if (metric instanceof Gauge) {
>   // deal with gauge
> } else if (metric instanceof Histogram) {
>   // deal with histogram
> } else if (metric instanceof Meter) {
>   // deal with meter
> } else {
>   // log something or throw an exception
> }
> {code}
> This has a few issues
> * the instanceof checks and castings are unnecessary overhead
> * it requires the implementer to be aware of every metric type
> * it encourages throwing an exception in the final else block
> We could remedy all of these by reworking the interface to contain explicit 
> add/remove methods for every metric type. This would however be a breaking 
> change and blow up the interface to 12 methods from the current 4. We could 
> also add a RichMetricReporter interface with these methods, which would 
> require relatively little changes but add additional complexity.
> I was wondering what other people think about this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2829: [hotfix] prevent RecordWriter#flush() to clear the serial...

2016-11-18 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/2829
  
I don't expect this to change any behaviour as clearing the serializer 
twice does actually not hurt and is only some waste of resources so FLINK-4719 
should not be affected at all


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4719) KryoSerializer random exception

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676660#comment-15676660
 ] 

ASF GitHub Bot commented on FLINK-4719:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/2829
  
I don't expect this to change any behaviour as clearing the serializer 
twice does actually not hurt and is only some waste of resources so FLINK-4719 
should not be affected at all


> KryoSerializer random exception
> ---
>
> Key: FLINK-4719
> URL: https://issues.apache.org/jira/browse/FLINK-4719
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.1
>Reporter: Flavio Pompermaier
>  Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when 
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here 
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger spilling thread' terminated due to an exception: 
> Unable to find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
> find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: Unable to find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> java.ttil.HashSet
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If

[jira] [Commented] (FLINK-4719) KryoSerializer random exception

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676633#comment-15676633
 ] 

ASF GitHub Bot commented on FLINK-4719:
---

Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/2829
  
Could this be a possible fix for FLINK-4719?


> KryoSerializer random exception
> ---
>
> Key: FLINK-4719
> URL: https://issues.apache.org/jira/browse/FLINK-4719
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.1
>Reporter: Flavio Pompermaier
>  Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when 
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here 
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger spilling thread' terminated due to an exception: 
> Unable to find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
> find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: Unable to find class: java.ttil.HashSet
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> java.ttil.HashSet
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-se

[GitHub] flink issue #2829: Hotfix 2016 11 18

2016-11-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2829
  
Could you modify the PR title to something more descriptive?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2829: Hotfix 2016 11 18

2016-11-18 Thread fpompermaier
Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/2829
  
Could this be a possible fix for FLINK-4719?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2829: Hotfix 2016 11 18

2016-11-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/2829#discussion_r88649470
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ---
@@ -151,6 +176,15 @@ private SerializationResult getSerializationResult() {
return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
}
 
+   /**
+* Gets the currently set target buffer and sets its size to the actual
+* number of written bytes.
+*
+* After calling this method, a new target buffer is required to 
continue
+* writing (see {@link #setNextBuffer(Buffer)}).
+*
+* @return the target buffer that was used
+*/
--- End diff --

I wasn't quite sure whether this is the intended behaviour for all (future) 
implementations of the interface - at the moment, there is only this one though 
and I described its actual behaviour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2829: Hotfix 2016 11 18

2016-11-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2829#discussion_r88649329
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ---
@@ -151,6 +176,15 @@ private SerializationResult getSerializationResult() {
return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
}
 
+   /**
+* Gets the currently set target buffer and sets its size to the actual
+* number of written bytes.
+*
+* After calling this method, a new target buffer is required to 
continue
+* writing (see {@link #setNextBuffer(Buffer)}).
+*
+* @return the target buffer that was used
+*/
--- End diff --

Shouldn't these javadocs be added to the RecordSerializer interface 
instead? (the same applies to other implemented methods)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2829: Hotfix 2016 11 18

2016-11-18 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/2829

Hotfix 2016 11 18

Prevent RecordWriter#flush() to clear the serializer twice.
Also add some documentation to RecordWriter, RecordSerializer and 
SpanningRecordSerializer.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink hotfix_2016-11-18

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2829.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2829


commit 272737006e150665111fdfcd1e3f2194bd7243af
Author: Nico Kruber 
Date:   2016-11-17T12:57:18Z

[hotfix] add javadocs to SpanningRecordSerializer and RecordSerializer

commit c8f4db9572521f3a1eba4bef33184178128917b7
Author: Nico Kruber 
Date:   2016-11-17T13:01:52Z

[hotfix] no need to clear the serializer twice in RecordWriter#flush()

commit 85ac76e097a482ef89d6c3d1a7e913b666de8159
Author: Nico Kruber 
Date:   2016-11-17T16:52:09Z

[hotfix] further javadoc tweaks in RecordWriter




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5094) Support RichReduceFunction and RichFoldFunction as incremental window aggregation functions

2016-11-18 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-5094:
-
Description: 
Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental window 
aggregation functions in order to initialize the functions via {{open()}}.

The main problem is that we do not want to provide the full power of 
{{RichFunction}} for incremental aggregation functions, such as defining own 
operator state. This could be achieve by providing some kind of 
{{RestrictedRuntimeContext}}.

  was:Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental 
window aggregation functions in order to initialize the functions via 
{{open()}].


> Support RichReduceFunction and RichFoldFunction as incremental window 
> aggregation functions
> ---
>
> Key: FLINK-5094
> URL: https://issues.apache.org/jira/browse/FLINK-5094
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Windowing Operators
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>
> Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental window 
> aggregation functions in order to initialize the functions via {{open()}}.
> The main problem is that we do not want to provide the full power of 
> {{RichFunction}} for incremental aggregation functions, such as defining own 
> operator state. This could be achieve by providing some kind of 
> {{RestrictedRuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5094) Support RichReduceFunction and RichFoldFunction as incremental window aggregation functions

2016-11-18 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-5094:


 Summary: Support RichReduceFunction and RichFoldFunction as 
incremental window aggregation functions
 Key: FLINK-5094
 URL: https://issues.apache.org/jira/browse/FLINK-5094
 Project: Flink
  Issue Type: Improvement
  Components: Windowing Operators
Affects Versions: 1.1.3, 1.2.0
Reporter: Fabian Hueske


Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental window 
aggregation functions in order to initialize the functions via {{open()}].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5094) Support RichReduceFunction and RichFoldFunction as incremental window aggregation functions

2016-11-18 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-5094:
-
Component/s: Streaming

> Support RichReduceFunction and RichFoldFunction as incremental window 
> aggregation functions
> ---
>
> Key: FLINK-5094
> URL: https://issues.apache.org/jira/browse/FLINK-5094
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Windowing Operators
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>
> Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental window 
> aggregation functions in order to initialize the functions via {{open()}].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2609) Automatic type registration is only called from the batch execution environment

2016-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676468#comment-15676468
 ] 

ASF GitHub Bot commented on FLINK-2609:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/1833


> Automatic type registration is only called from the batch execution 
> environment
> ---
>
> Key: FLINK-2609
> URL: https://issues.apache.org/jira/browse/FLINK-2609
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> Kryo types in the streaming API are quite expensive to serialize because they 
> are not automatically registered at Kryo.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1833: [FLINK-2609] [streaming] auto-register types

2016-11-18 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/1833


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5054) Make the BucketingSink rescalable.

2016-11-18 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5054.
---
Resolution: Fixed

Fixed in 13ebb36bb6c7fbe591d9e7834a2fc34d8469bc00

> Make the BucketingSink rescalable.
> --
>
> Key: FLINK-5054
> URL: https://issues.apache.org/jira/browse/FLINK-5054
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Aims at integrating the BucketingSink with the rescalable state 
> abstractions so that the parallelism can change when 
> restoring from a savepoint without sacrificing the provided guarantees.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >