[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904650#comment-15904650
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

Github user DmytroShkvyra closed the pull request at:

https://github.com/apache/flink/pull/3502


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dmytro Shkvyra
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904651#comment-15904651
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

GitHub user DmytroShkvyra reopened a pull request:

https://github.com/apache/flink/pull/3502

[FLINK-4565] Support for SQL IN operator

[FLINK-4565] Support for SQL IN operator
This PR is a part of work on SQL IN operator in Table API, which implements 
IN for literals.
Two cases are covered: less and great then 20 literals.

Also I have some questions:
- converting all numeric types to BigDecimal isn't ok? I decided to make so 
to simplify use of hashset.
- validation isn't really good. It forces to use operator with same type 
literals. Should I rework it or maybe just add more cases?
expressionDsl.scala:
entry point for IN operator in scala API
ScalarOperators.scala:
1) All numeric types are upcasting to BigDecimal for using in hashset, 
other types are unchanged in castNumeric
2) valuesInitialization used for 2 cases: when we have more then 20 
operands (then we use hashset, initialized in constructor, descibed below) and 
less then 20 operands (then we initialize operands in method's body and use 
them in conjunction)
3) comparison also covers described above cases. In first case we use 
callback to declare and initialize hashset with all operands. Otherwise we just 
put all operands in conjunction.
4) Final code is built up with these code snippets.
CodeGenerator.scala:
passes arguments and callback to declare and init hashet
FunctionCatalog.scala:
registers "in" as method
InITCase:
some use cases

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3502


commit f7960d66a0f885a5b032345427c5380f268cc60e
Author: DmytroShkvyra 
Date:   2017-03-09T19:37:46Z

[FLINK-4565] Support for SQL IN operator




> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dmytro Shkvyra
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)

2017-03-09 Thread Luke Hutchison (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Hutchison closed FLINK-6015.
-
Resolution: Fixed

Fixed in FLINK-5907

> "Row too short" when reading CSV line with empty last field (i.e. ending in 
> comma)
> --
>
> Key: FLINK-6015
> URL: https://issues.apache.org/jira/browse/FLINK-6015
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
> Environment: Linux
>Reporter: Luke Hutchison
>
> When using env.readCsvFile(filename), if a line in the CSV file has an empty 
> last field, the line ends with a comma. This triggers an exception in 
> GenericCsvInput.parseRecord():
>   // check valid start position
>   if (startPos >= limit) {
>   if (lenient) {
>   return false;
>   } else {
>   throw new ParseException("Row too 
> short: " + new String(bytes, offset, numBytes));
>   }
>   }
> Setting the parser to lenient would cause the last field to be left as null, 
> rather than setting its value to "".
> The parser should accept empty values for the last field on a row.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)

2017-03-09 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904615#comment-15904615
 ] 

Luke Hutchison commented on FLINK-6015:
---

Yes, this looks like the same bug. Thanks.

> "Row too short" when reading CSV line with empty last field (i.e. ending in 
> comma)
> --
>
> Key: FLINK-6015
> URL: https://issues.apache.org/jira/browse/FLINK-6015
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
> Environment: Linux
>Reporter: Luke Hutchison
>
> When using env.readCsvFile(filename), if a line in the CSV file has an empty 
> last field, the line ends with a comma. This triggers an exception in 
> GenericCsvInput.parseRecord():
>   // check valid start position
>   if (startPos >= limit) {
>   if (lenient) {
>   return false;
>   } else {
>   throw new ParseException("Row too 
> short: " + new String(bytes, offset, numBytes));
>   }
>   }
> Setting the parser to lenient would cause the last field to be left as null, 
> rather than setting its value to "".
> The parser should accept empty values for the last field on a row.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904580#comment-15904580
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105337481
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
--- End diff --

Yes, there may be some memory problem if the watermark interval is too 
long, but if use statebackend, the cost of serialize whole list everytime when 
update  is too large, like you say, if there can be a sorting statebackend that 
can provide quikly search and update, that's great for me.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105337481
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
--- End diff --

Yes, there may be some memory problem if the watermark interval is too 
long, but if use statebackend, the cost of serialize whole list everytime when 
update  is too large, like you say, if there can be a sorting statebackend that 
can provide quikly search and update, that's great for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3505: [backport-1.2] [FLINK-6006] [kafka] Always use com...

2017-03-09 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3505

[backport-1.2] [FLINK-6006]  [kafka] Always use complete restored state in 
FlinkKafkaConsumer

(This PR is the fix of FLINK-6006 for Flink 1.2)

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This PR fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-6006

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3505


commit d38e3fd4b123cacf4fd6ee3c1baab77e9b8593a0
Author: Tzu-Li (Gordon) Tai 
Date:   2017-03-10T06:10:56Z

[FLINK-6006] [kafka] Always use complete restored state in 
FlinkKafkaConsumer

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This commit fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)

2017-03-09 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904496#comment-15904496
 ] 

Kurt Young commented on FLINK-6015:
---

HI, i think this issue has been addressed by 
https://issues.apache.org/jira/browse/FLINK-5907

> "Row too short" when reading CSV line with empty last field (i.e. ending in 
> comma)
> --
>
> Key: FLINK-6015
> URL: https://issues.apache.org/jira/browse/FLINK-6015
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
> Environment: Linux
>Reporter: Luke Hutchison
>
> When using env.readCsvFile(filename), if a line in the CSV file has an empty 
> last field, the line ends with a comma. This triggers an exception in 
> GenericCsvInput.parseRecord():
>   // check valid start position
>   if (startPos >= limit) {
>   if (lenient) {
>   return false;
>   } else {
>   throw new ParseException("Row too 
> short: " + new String(bytes, offset, numBytes));
>   }
>   }
> Setting the parser to lenient would cause the last field to be left as null, 
> rather than setting its value to "".
> The parser should accept empty values for the last field on a row.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method

2017-03-09 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6018:
---
Description: 
The code snippet currently in the `AbstractKeyedStateBackend # 
getPartitionedState` method, as follows:
{code}
line 352: // TODO: This is wrong, it should throw an exception that the 
initialization has not properly happened
line 353: if (!stateDescriptor.isSerializerInitialized()) {
line 354:stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
line 354 }
{code}
Method `isSerializerInitialized`:
{code}
public boolean isSerializerInitialized() {
return serializer != null;
}
{code}
Method `initializeSerializerUnlessSet`:
{code}
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
if (serializer == null) { 
if (typeInfo != null) {
serializer = 
typeInfo.createSerializer(executionConfig);
} else {
throw new IllegalStateException(
"Cannot initialize serializer 
after TypeInformation was dropped during serialization");
}
}
}
{code}
that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
been checked by `serializer == null`.So I hope this code has a little 
improvement to the following:
approach 1: 
According to the `TODO` information  we throw an exception
{code}
if (!stateDescriptor.isSerializerInitialized()) {
throw new IllegalStateException("The serializer of the 
descriptor has not been initialized!"); 
}
{code}
approach 2:
Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
{` logic.
{code}
stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
{code}
Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` 
add a `private final ExecutionConfig executionConfig` property. then we can 
change the code like this:
{code}
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
{code}
Are the above suggestions reasonable for you? 
Welcome anybody's feedback and corrections.

  was:
The code snippet currently in the `AbstractKeyedStateBackend # 
getPartitionedState` method, as follows:
{code}
line 352: // TODO: This is wrong, it should throw an exception that the 
initialization has not properly happened
line 353: if (!stateDescriptor.isSerializerInitialized()) {
line 354:stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
line 354 }
{code}
I hope this code has a little improvement to the following:
approach 1: 
According to the `TODO` information  we throw an exception
{code}
if (!stateDescriptor.isSerializerInitialized()) {
throw new IllegalStateException("The serializer of the 
descriptor has not been initialized!"); 
}
{code}
approach 2:
Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
{` logic.
{code}
stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
{code}
Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` 
add a `private final ExecutionConfig executionConfig` property. then we can 
change the code like this:
{code}
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
{code}
Are the above suggestions reasonable for you? 
Welcome anybody's feedback and corrections.


> Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` 
> method
> ---
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>   return serializer != null;
>   }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>   if (serializer == null) { 
>   if (typeInfo != null) {
>   serializer = 
> typeInfo.createSerializer(executionConfig);
>   } else {
>

[jira] [Updated] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method

2017-03-09 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6018:
---
Description: 
The code snippet currently in the `AbstractKeyedStateBackend # 
getPartitionedState` method, as follows:
{code}
line 352: // TODO: This is wrong, it should throw an exception that the 
initialization has not properly happened
line 353: if (!stateDescriptor.isSerializerInitialized()) {
line 354:stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
line 354 }
{code}
I hope this code has a little improvement to the following:
approach 1: 
According to the `TODO` information  we throw an exception
{code}
if (!stateDescriptor.isSerializerInitialized()) {
throw new IllegalStateException("The serializer of the 
descriptor has not been initialized!"); 
}
{code}
approach 2:
Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
{` logic.
{code}
stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
{code}
Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` 
add a `private final ExecutionConfig executionConfig` property. then we can 
change the code like this:
{code}
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
{code}
Are the above suggestions reasonable for you? 
Welcome anybody's feedback and corrections.

  was:
The code snippet currently in the `AbstractKeyedStateBackend # 
getPartitionedState` method, as follows:
{code}
// TODO: This is wrong, it should throw an exception that the initialization 
has not properly happened
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
}
{code}
I hope this code has a little improvement to the following:
approach 1: 
According to the `TODO` information  we throw an exception
{code}
if (!stateDescriptor.isSerializerInitialized()) {
throw new IllegalStateException("The serializer of the 
descriptor has not been initialized!"); 
}
{code}
approach 2:
Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
{` logic.
{code}
stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
{code}
Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` 
add a `private final ExecutionConfig executionConfig` property. then we can 
change the code like this:
{code}
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
{code}
Are the above suggestions reasonable for you? 
Welcome anybody's feedback and corrections.


> Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` 
> method
> ---
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> I hope this code has a little improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>   throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method

2017-03-09 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6018:
--

 Summary: Minor improvements about 
`AbstractKeyedStateBackend#getPartitionedState` method
 Key: FLINK-6018
 URL: https://issues.apache.org/jira/browse/FLINK-6018
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, State Backends, Checkpointing
Reporter: sunjincheng
Assignee: sunjincheng


The code snippet currently in the `AbstractKeyedStateBackend # 
getPartitionedState` method, as follows:
{code}
// TODO: This is wrong, it should throw an exception that the initialization 
has not properly happened
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
}
{code}
I hope this code has a little improvement to the following:
approach 1: 
According to the `TODO` information  we throw an exception
{code}
if (!stateDescriptor.isSerializerInitialized()) {
throw new IllegalStateException("The serializer of the 
descriptor has not been initialized!"); 
}
{code}
approach 2:
Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
{` logic.
{code}
stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
{code}
Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` 
add a `private final ExecutionConfig executionConfig` property. then we can 
change the code like this:
{code}
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
{code}
Are the above suggestions reasonable for you? 
Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6017) CSV reader does not support quoted double quotes

2017-03-09 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6017:
-

 Summary: CSV reader does not support quoted double quotes
 Key: FLINK-6017
 URL: https://issues.apache.org/jira/browse/FLINK-6017
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
 Environment: Linux
Reporter: Luke Hutchison


The RFC for the CSV format specifies that double quotes are valid in quoted 
strings in CSV, by doubling the quote character:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing quoted quotes, such as:

bob,"The name is ""Bob"""

you get this exception:

org.apache.flink.api.common.io.ParseException: Line could not be parsed: 
'bob,"The name is ""Bob"""'
ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3504: [FLINK-6010] Documentation: correct IntelliJ IDEA ...

2017-03-09 Thread phoenixjiangnan
GitHub user phoenixjiangnan opened a pull request:

https://github.com/apache/flink/pull/3504

[FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 'In…

…stalling the Scala plugin' section


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/phoenixjiangnan/flink FLINK-6010

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3504.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3504


commit 0aa3dccdcd80c5c58161296718974f695cdea2c9
Author: Bowen Li 
Date:   2017-03-09T21:55:07Z

[FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 
'Installing the Scala plugin' section




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6010) Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904413#comment-15904413
 ] 

ASF GitHub Bot commented on FLINK-6010:
---

GitHub user phoenixjiangnan opened a pull request:

https://github.com/apache/flink/pull/3504

[FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 'In…

…stalling the Scala plugin' section


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/phoenixjiangnan/flink FLINK-6010

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3504.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3504


commit 0aa3dccdcd80c5c58161296718974f695cdea2c9
Author: Bowen Li 
Date:   2017-03-09T21:55:07Z

[FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 
'Installing the Scala plugin' section




> Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala 
> plugin' section
> --
>
> Key: FLINK-6010
> URL: https://issues.apache.org/jira/browse/FLINK-6010
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Bowen Li
> Fix For: 1.2.1
>
>
> In 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/ide_setup.html#installing-the-scala-plugin,
>  how you should get to 'plugins' page in IntelliJ IDEA is wrong. This seems 
> to be describing a much older version of IntelliJ IDE.
> The correct path now is: IntelliJ IDEA -> Preferences -> Plugins -> Install 
> JetBrains plugin 
> I'll submit a PR to fix this



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-09 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904396#comment-15904396
 ] 

sunjincheng commented on FLINK-5995:


HI, [~srichter] [~aljoscha] Thanks for your suggestions.
I made a simple implementation, and opened the PR, I would be grateful if you 
could review. Fell free to tell me If there is anything Incorrect.

Best,
SunJincheng

> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904386#comment-15904386
 ] 

ASF GitHub Bot commented on FLINK-5995:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3503

[FLINK-5995][checkpoints] fix Get a Exception when creating the ListS…

…tateDescriptor with a TypeInformation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [×] General
  - The pull request references the related JIRA issue ("[FLINK-5995] Fix 
Get a Exception when creating the ListStateDescriptor with a TypeInformation")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-5995-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3503.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3503






> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.Abstr

[GitHub] flink pull request #3503: [FLINK-5995][checkpoints] fix Get a Exception when...

2017-03-09 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3503

[FLINK-5995][checkpoints] fix Get a Exception when creating the ListS…

…tateDescriptor with a TypeInformation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [×] General
  - The pull request references the related JIRA issue ("[FLINK-5995] Fix 
Get a Exception when creating the ListStateDescriptor with a TypeInformation")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-5995-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3503.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3503






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6016) Newlines should be valid in quoted strings in CSV

2017-03-09 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6016:
-

 Summary: Newlines should be valid in quoted strings in CSV
 Key: FLINK-6016
 URL: https://issues.apache.org/jira/browse/FLINK-6016
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
Affects Versions: 1.2.0
Reporter: Luke Hutchison


The RFC for the CSV format specifies that newlines are valid in quoted strings 
in CSV:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing a newline, such as:

"3
4",5

you get this exception:

Line could not be parsed: '"3'
ParserError UNTERMINATED_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904367#comment-15904367
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105323083
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testWithoutPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSi

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904363#comment-15904363
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105323567
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
--- End diff --

I suggest that change `Eventtime` to `EventTime`, What do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Tra

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904370#comment-15904370
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105324505
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
+new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]]
+
+  /** For store timeSectionsMap **/
+  private var timeSectionsState: ListState[String] = _
+  private var inputKeySerializer: TypeSerializer[Tuple] = _
+  private var timeSerializer: TypeSerializer[TimeWindow] = _
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
interMediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+timeSerializer = new TimeWindow.Serializer
+val stateDescriptor: MapStateDescriptor[TimeWindow, Row] =
+  new MapStateDescriptor[TimeWindow, Row]("rowtimeovers

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904365#comment-15904365
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322751
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
--- End diff --

I suggest that partitionBy `c` filed. Just a suggestion.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904366#comment-15904366
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105323410
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -80,6 +81,38 @@ object AggregateUtil {
   }
 
   /**
+* Create an [[ProcessFunction]] to evaluate final aggregate value.
+*
+* @param namedAggregates List of calls to aggregate functions and 
their output field names
+* @param inputType Input row type
+* @return [[UnboundedProcessingOverProcessFunction]]
+*/
+  private[flink] def CreateUnboundedEventtimeOverProcessFunction(
--- End diff --

I suggest that change `Eventtime` to `EventTime`, What do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904371#comment-15904371
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322914
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testWithoutPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSi

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904369#comment-15904369
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322654
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
--- End diff --

This test data is a bit simple, I recommend enriching some test data,  such 
as:
  ```
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 5L, "Hello"))
data.+=((1, 3L, "Hello"))
data.+=((3, 7L, "Hello world"))
data.+=((4, 9L, "Hello world"))
data.+=((5, 8L, "Hello world"))
```
what do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataSt

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904368#comment-15904368
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105325003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
--- End diff --

It's not a good idea to use a memory data structure here because i worry 
about OOM problems in big data situations, I suggest use stateBackend. 
Unfortunately we are not currently sorting stateBackend, maybe we can think 
about other ways.  I'm not sure, but I need to think about it and then give you 
feedback. what do you think?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Huesk

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904364#comment-15904364
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322866
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
--- End diff --

Why use a fixed value to produce watermark, can generate watermark based on 
data?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProj

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105323567
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
--- End diff --

I suggest that change `Eventtime` to `EventTime`, What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322751
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
--- End diff --

I suggest that partitionBy `c` filed. Just a suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322914
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testWithoutPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), 
StreamITCase.testResults.sorted.get(StreamITCase.testResults.size - 1))
--- End diff --

Can you test the results of each output?


---
If your project is set up for it, you can

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322654
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
--- End diff --

This test data is a bit simple, I recommend enriching some test data,  such 
as:
  ```
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 5L, "Hello"))
data.+=((1, 3L, "Hello"))
data.+=((3, 7L, "Hello world"))
data.+=((4, 9L, "Hello world"))
data.+=((5, 8L, "Hello world"))
```
what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105324505
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
+new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]]
+
+  /** For store timeSectionsMap **/
+  private var timeSectionsState: ListState[String] = _
+  private var inputKeySerializer: TypeSerializer[Tuple] = _
+  private var timeSerializer: TypeSerializer[TimeWindow] = _
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
interMediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+timeSerializer = new TimeWindow.Serializer
+val stateDescriptor: MapStateDescriptor[TimeWindow, Row] =
+  new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", 
timeSerializer, valueSerializer)
+inputKeySerializer = 
keyType.createSerializer(getRuntimeContext.getExecutionConfig)
+state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor)
+  }
+
+  override def proce

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105322866
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
--- End diff --

Why use a fixed value to produce watermark, can generate watermark based on 
data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105325003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventtimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
--- End diff --

It's not a good idea to use a memory data structure here because i worry 
about OOM problems in big data situations, I suggest use stateBackend. 
Unfortunately we are not currently sorting stateBackend, maybe we can think 
about other ways.  I'm not sure, but I need to think about it and then give you 
feedback. what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105323083
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by 
rowtime() range between " +
+  "unbounded preceding and current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected1 = mutable.MutableList(
+  "1,1,1", "2,2,2", "3,2,5")
+val expected2 = mutable.MutableList(
+  "1,1,1", "2,2,5", "3,2,3")
+assertTrue(expected1.equals(StreamITCase.testResults.sorted) ||
+  expected2.equals(StreamITCase.testResults.sorted))
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testWithoutPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.testResults = mutable.MutableList()
+
+val sqlQuery = "SELECT SUM(a) " +
+  "over (order by rowtime() range between unbounded preceding and 
current row) from T1"
+
+val t1 = StreamTestData.getSmall3TupleDataStream(env)
+  .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[(Int, Long, String)] {
+
+def getCurrentWatermark: Watermark = new Watermark(130L)
+
+def extractTimestamp(element: (Int, Long, String), 
previousElementTimestamp: Long): Long =
+  140
+  }).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+assertEquals(Some("6"), 
StreamITCase.testResults.sorted.get(StreamITCase.testResults.size - 1))
+  }
+
+  /** test sliding event-time unbounded window with later record **/
+  @Test
+  def test

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r105323410
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -80,6 +81,38 @@ object AggregateUtil {
   }
 
   /**
+* Create an [[ProcessFunction]] to evaluate final aggregate value.
+*
+* @param namedAggregates List of calls to aggregate functions and 
their output field names
+* @param inputType Input row type
+* @return [[UnboundedProcessingOverProcessFunction]]
+*/
+  private[flink] def CreateUnboundedEventtimeOverProcessFunction(
--- End diff --

I suggest that change `Eventtime` to `EventTime`, What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)

2017-03-09 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6015:
-

 Summary: "Row too short" when reading CSV line with empty last 
field (i.e. ending in comma)
 Key: FLINK-6015
 URL: https://issues.apache.org/jira/browse/FLINK-6015
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
Affects Versions: 1.2.0
 Environment: Linux
Reporter: Luke Hutchison


When using env.readCsvFile(filename), if a line in the CSV file has an empty 
last field, the line ends with a comma. This triggers an exception in 
GenericCsvInput.parseRecord():

// check valid start position
if (startPos >= limit) {
if (lenient) {
return false;
} else {
throw new ParseException("Row too 
short: " + new String(bytes, offset, numBytes));
}
}

Setting the parser to lenient would cause the last field to be left as null, 
rather than setting its value to "".

The parser should accept empty values for the last field on a row.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5861) TaskManager's components support updating JobManagerConnection

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904310#comment-15904310
 ] 

ASF GitHub Bot commented on FLINK-5861:
---

Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3395
  
Hi Till, I almost miss this comments! I didn't see it until a few minutes 
ago.

I fully understand your concern. Just let me explain more about your 
comments.
1. I agree most of your suggestions. Such as null check, formatting problem 
and TestLogger.
2. Currently synchronize problem will not happen. I think replacing the 
field value is safe. That's a atomic operation. Correct me if I'm wrong.
3. This PR will not work with other reconciliation PRs. Nobody will notify 
these listeners. Actually we implemented reconnection between TM and JM. It 
will work with those codes. The reason I make this PR without other 
reconciliation PRs is that I think this PR is independent with other parts. I 
believe filing a huge PR is both terrible for reviewer and writer. However this 
single PR makes you confused. Sorry about that. 
4. Actually I'm not sure listener pattern is the best way to do this. But I 
think it's the simplest way which makes least modifications of current 
implementation. If the TM reconnected with new JM, how can we update the 
JobMasterGateway handled by components? I can't figure out a better way except 
reimplementing these components.

Anyway, thank you for reviewing and commenting so many! I agree with you 
that we should close this PR at this moment. After making an agreement about 
main reconciliation PRs, we can talk about what this PR try to implement. 


> TaskManager's components support updating JobManagerConnection
> --
>
> Key: FLINK-5861
> URL: https://issues.apache.org/jira/browse/FLINK-5861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Biao Liu
>Assignee: Biao Liu
> Fix For: 1.3.0
>
>
> Some components in TaskManager, such as TaskManagerActions, 
> CheckpointResponder, ResultPartitionConsumableNotifier, 
> PartitionProducerStateChecker, need to support updating JobManagerConnection. 
> So when JobManager fails and recovers, the tasks who keep old 
> JobManagerConnection can be notified to update JobManagerConnection. The 
> tasks can continue doing their jobs without failure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5861) TaskManager's components support updating JobManagerConnection

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904311#comment-15904311
 ] 

ASF GitHub Bot commented on FLINK-5861:
---

Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/3395


> TaskManager's components support updating JobManagerConnection
> --
>
> Key: FLINK-5861
> URL: https://issues.apache.org/jira/browse/FLINK-5861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Biao Liu
>Assignee: Biao Liu
> Fix For: 1.3.0
>
>
> Some components in TaskManager, such as TaskManagerActions, 
> CheckpointResponder, ResultPartitionConsumableNotifier, 
> PartitionProducerStateChecker, need to support updating JobManagerConnection. 
> So when JobManager fails and recovers, the tasks who keep old 
> JobManagerConnection can be notified to update JobManagerConnection. The 
> tasks can continue doing their jobs without failure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3395: [FLINK-5861] Components of TaskManager support updating J...

2017-03-09 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3395
  
Hi Till, I almost miss this comments! I didn't see it until a few minutes 
ago.

I fully understand your concern. Just let me explain more about your 
comments.
1. I agree most of your suggestions. Such as null check, formatting problem 
and TestLogger.
2. Currently synchronize problem will not happen. I think replacing the 
field value is safe. That's a atomic operation. Correct me if I'm wrong.
3. This PR will not work with other reconciliation PRs. Nobody will notify 
these listeners. Actually we implemented reconnection between TM and JM. It 
will work with those codes. The reason I make this PR without other 
reconciliation PRs is that I think this PR is independent with other parts. I 
believe filing a huge PR is both terrible for reviewer and writer. However this 
single PR makes you confused. Sorry about that. 
4. Actually I'm not sure listener pattern is the best way to do this. But I 
think it's the simplest way which makes least modifications of current 
implementation. If the TM reconnected with new JM, how can we update the 
JobMasterGateway handled by components? I can't figure out a better way except 
reimplementing these components.

Anyway, thank you for reviewing and commenting so many! I agree with you 
that we should close this PR at this moment. After making an agreement about 
main reconciliation PRs, we can talk about what this PR try to implement. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

2017-03-09 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/3395


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-09 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-3849:
-

Assignee: Kurt Young

> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests

2017-03-09 Thread liuyuzhong7
Github user liuyuzhong7 commented on the issue:

https://github.com/apache/flink/pull/3485
  
@StephanEwen  
FlatMapFunction "Tokenizer" has move to  
org.apache.flink.test.testfunctions.  
Help me to review this. Thanks.  
And collect other reusable functions to org.apache.flink.test.testfunctions 
later.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904280#comment-15904280
 ] 

ASF GitHub Bot commented on FLINK-5976:
---

Github user liuyuzhong7 commented on the issue:

https://github.com/apache/flink/pull/3485
  
@StephanEwen  
FlatMapFunction "Tokenizer" has move to  
org.apache.flink.test.testfunctions.  
Help me to review this. Thanks.  
And collect other reusable functions to org.apache.flink.test.testfunctions 
later.  


> Refactoring duplicate Tokenizer in flink-test
> -
>
> Key: FLINK-5976
> URL: https://issues.apache.org/jira/browse/FLINK-5976
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.2.0
>Reporter: liuyuzhong7
>Priority: Minor
>  Labels: test
> Fix For: 1.2.0
>
>
> There are some duplicate code like this in flink-test, I think refactor this 
> will be better. 
> ```
> public final class Tokenizer implements FlatMapFunction Tuple2> {
>   @Override
>   public void flatMap(String value, Collector> 
> out) {
>   // normalize and split the line
>   String[] tokens = value.toLowerCase().split("\\W+");
>   // emit the pairs
>   for (String token : tokens) {
>   if (token.length() > 0) {
>   out.collect(new Tuple2(token, 
> 1));
>   }
>   }
>   }
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-03-09 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904271#comment-15904271
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske], i can continue working on {{FilterableTableSource}}. Should i 
open another PR based on the former changes?

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6014) Allow the registration of state objects in checkpoints

2017-03-09 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-6014:
---

 Summary: Allow the registration of state objects in checkpoints
 Key: FLINK-6014
 URL: https://issues.apache.org/jira/browse/FLINK-6014
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi
Assignee: Xiaogang Shi


This issue is the very first step towards incremental checkpointing. We 
introduce a new state handle named {{CompositeStateHandle}} to be the base of 
the snapshots taken by task components.  Known implementation may include 
{{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for 
subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s).

Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. 
It should register all its state objects in {{StateRegistry}} when its 
checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending 
checkpoint completes or a complete checkpoint is reloaded in the recovery). 

When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, 
we should not simply discard all state objects in the checkpoint. With the 
introduction of incremental checkpointing, a {{StateObject}} may be referenced 
by different checkpoints. We should unregister all the state objects contained 
in the {{StateRegistry}} first. Only those state objects that are not 
referenced by any checkpoint can be deleted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904258#comment-15904258
 ] 

ASF GitHub Bot commented on FLINK-5976:
---

Github user liuyuzhong7 commented on the issue:

https://github.com/apache/flink/pull/3485
  
OK, get it. So how about just do it now in flink-tests like this pull 
request?


> Refactoring duplicate Tokenizer in flink-test
> -
>
> Key: FLINK-5976
> URL: https://issues.apache.org/jira/browse/FLINK-5976
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.2.0
>Reporter: liuyuzhong7
>Priority: Minor
>  Labels: test
> Fix For: 1.2.0
>
>
> There are some duplicate code like this in flink-test, I think refactor this 
> will be better. 
> ```
> public final class Tokenizer implements FlatMapFunction Tuple2> {
>   @Override
>   public void flatMap(String value, Collector> 
> out) {
>   // normalize and split the line
>   String[] tokens = value.toLowerCase().split("\\W+");
>   // emit the pairs
>   for (String token : tokens) {
>   if (token.length() > 0) {
>   out.collect(new Tuple2(token, 
> 1));
>   }
>   }
>   }
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests

2017-03-09 Thread liuyuzhong7
Github user liuyuzhong7 commented on the issue:

https://github.com/apache/flink/pull/3485
  
OK, get it. So how about just do it now in flink-tests like this pull 
request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-09 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3406
  
Hi @fhueske , i like your propose about moving the annotation from 
`TableSource` to `TableSourceConverter`. Lets do it this way. 
BTW, i noticed that you offered three possible methods to the 
`TableSourceConverter`, i can only imagine `def requiredProperties: 
Array[String]
` is necessary for now. It can help validating the converter and to decide 
which converter we should use when multiple converters have the same 
`TableType`. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-09 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105316858
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.annotation;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.table.catalog.TableSourceConverter;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A tableSource with this annotation represents it is compatible with 
external catalog, that is,
+ * an instance of this tableSource can be converted to or converted from 
external catalog table
+ * instance.
+ * The annotation contains the following information:
+ * 
+ *  external catalog table type name for this kind of tableSource 
+ *  external catalog table <-> tableSource converter class 
+ * 
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Public
+public @interface ExternalCatalogCompatible {
--- End diff --

This suggestion is pretty good, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-03-09 Thread Bowen Li (JIRA)
Bowen Li created FLINK-6013:
---

 Summary: Add Datadog HTTP metrics reporter
 Key: FLINK-6013
 URL: https://issues.apache.org/jira/browse/FLINK-6013
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.2.0
Reporter: Bowen Li
Priority: Critical
 Fix For: 1.2.1


We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a lot 
other companies also do.

Flink right now only has a StatsD metrics reporter, and users have to set up 
Datadog Agent in order to receive metrics from StatsD and transport them to 
Datadog. We don't like this approach.

We prefer to have a Datadog metrics reporter directly contacting Datadog http 
endpoint.

I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3461: [FLINK-5954] Always assign names to the window in the Str...

2017-03-09 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3461
  
Hi, @haohui Thanks for your explanation, and I'll  see the  detail in the 
FLINK-5954.
Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904189#comment-15904189
 ] 

ASF GitHub Bot commented on FLINK-5954:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3461
  
Hi, @haohui Thanks for your explanation, and I'll  see the  detail in the 
FLINK-5954.
Thanks,
SunJincheng


> Always assign names to the window in the Stream SQL API
> ---
>
> Key: FLINK-5954
> URL: https://issues.apache.org/jira/browse/FLINK-5954
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, 
> {{SESSION}} grouped windows, as well as the corresponding auxiliary functions 
> that allow uses to query the start and the end of the windows (e.g., 
> {{TUMBLE_START()}} and {{TUMBLE_END()}} see 
> http://calcite.apache.org/docs/stream.html for more details).
> The goal of this jira is to add support for these auxiliary functions in 
> Flink. Flink already has runtime supports for them, as these functions are 
> essential mapped to the {{WindowStart}} and {{WindowEnd}} classes.
> To implement this feature in transformation, the transformation needs to 
> recognize these functions and map them to the {{WindowStart}} and 
> {{WindowEnd}} classes.
> The problem is that both classes can only refer to the windows using alias. 
> Therefore this jira proposes to assign a unique name for each window to 
> enable the transformation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904089#comment-15904089
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user addisonj commented on the issue:

https://github.com/apache/flink/pull/1668
  
Very interested in this work. It sounds like there are few loose ends and 
then some cleanup before it might be ready for merge, @senorcarbone or 
@StephanEwen anything that can be supported by someone else? Would love to help 
wherever possible


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-09 Thread addisonj
Github user addisonj commented on the issue:

https://github.com/apache/flink/pull/1668
  
Very interested in this work. It sounds like there are few loose ends and 
then some cleanup before it might be ready for merge, @senorcarbone or 
@StephanEwen anything that can be supported by someone else? Would love to help 
wherever possible


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3461: [FLINK-5954] Always assign names to the window in ...

2017-03-09 Thread haohui
GitHub user haohui reopened a pull request:

https://github.com/apache/flink/pull/3461

[FLINK-5954] Always assign names to the window in the Stream SQL API.

Please see jira for more details. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5954

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3461


commit 064b827127b15a1397c216aae6611d575a75556b
Author: Haohui Mai 
Date:   2017-03-09T21:57:49Z

[FLINK-5954] Always assign names to the window in the Stream SQL API.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904050#comment-15904050
 ] 

ASF GitHub Bot commented on FLINK-5954:
---

GitHub user haohui reopened a pull request:

https://github.com/apache/flink/pull/3461

[FLINK-5954] Always assign names to the window in the Stream SQL API.

Please see jira for more details. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5954

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3461


commit 064b827127b15a1397c216aae6611d575a75556b
Author: Haohui Mai 
Date:   2017-03-09T21:57:49Z

[FLINK-5954] Always assign names to the window in the Stream SQL API.




> Always assign names to the window in the Stream SQL API
> ---
>
> Key: FLINK-5954
> URL: https://issues.apache.org/jira/browse/FLINK-5954
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, 
> {{SESSION}} grouped windows, as well as the corresponding auxiliary functions 
> that allow uses to query the start and the end of the windows (e.g., 
> {{TUMBLE_START()}} and {{TUMBLE_END()}} see 
> http://calcite.apache.org/docs/stream.html for more details).
> The goal of this jira is to add support for these auxiliary functions in 
> Flink. Flink already has runtime supports for them, as these functions are 
> essential mapped to the {{WindowStart}} and {{WindowEnd}} classes.
> To implement this feature in transformation, the transformation needs to 
> recognize these functions and map them to the {{WindowStart}} and 
> {{WindowEnd}} classes.
> The problem is that both classes can only refer to the windows using alias. 
> Therefore this jira proposes to assign a unique name for each window to 
> enable the transformation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904048#comment-15904048
 ] 

ASF GitHub Bot commented on FLINK-5954:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3461
  
Sorry for the late response. This is a prerequisite step for FLINK-6012 -- 
translating the group auxiliary functions (e.g., `TUMBLE_START`) to the 
corresponding Flink expressions (e.g., `WindowStart`).

A more detailed description is on the FLINK-5954.


> Always assign names to the window in the Stream SQL API
> ---
>
> Key: FLINK-5954
> URL: https://issues.apache.org/jira/browse/FLINK-5954
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, 
> {{SESSION}} grouped windows, as well as the corresponding auxiliary functions 
> that allow uses to query the start and the end of the windows (e.g., 
> {{TUMBLE_START()}} and {{TUMBLE_END()}} see 
> http://calcite.apache.org/docs/stream.html for more details).
> The goal of this jira is to add support for these auxiliary functions in 
> Flink. Flink already has runtime supports for them, as these functions are 
> essential mapped to the {{WindowStart}} and {{WindowEnd}} classes.
> To implement this feature in transformation, the transformation needs to 
> recognize these functions and map them to the {{WindowStart}} and 
> {{WindowEnd}} classes.
> The problem is that both classes can only refer to the windows using alias. 
> Therefore this jira proposes to assign a unique name for each window to 
> enable the transformation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3461: [FLINK-5954] Always assign names to the window in the Str...

2017-03-09 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3461
  
Sorry for the late response. This is a prerequisite step for FLINK-6012 -- 
translating the group auxiliary functions (e.g., `TUMBLE_START`) to the 
corresponding Flink expressions (e.g., `WindowStart`).

A more detailed description is on the FLINK-5954.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904042#comment-15904042
 ] 

ASF GitHub Bot commented on FLINK-5954:
---

Github user haohui closed the pull request at:

https://github.com/apache/flink/pull/3461


> Always assign names to the window in the Stream SQL API
> ---
>
> Key: FLINK-5954
> URL: https://issues.apache.org/jira/browse/FLINK-5954
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, 
> {{SESSION}} grouped windows, as well as the corresponding auxiliary functions 
> that allow uses to query the start and the end of the windows (e.g., 
> {{TUMBLE_START()}} and {{TUMBLE_END()}} see 
> http://calcite.apache.org/docs/stream.html for more details).
> The goal of this jira is to add support for these auxiliary functions in 
> Flink. Flink already has runtime supports for them, as these functions are 
> essential mapped to the {{WindowStart}} and {{WindowEnd}} classes.
> To implement this feature in transformation, the transformation needs to 
> recognize these functions and map them to the {{WindowStart}} and 
> {{WindowEnd}} classes.
> The problem is that both classes can only refer to the windows using alias. 
> Therefore this jira proposes to assign a unique name for each window to 
> enable the transformation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3461: [FLINK-5954] Always assign names to the window in ...

2017-03-09 Thread haohui
Github user haohui closed the pull request at:

https://github.com/apache/flink/pull/3461


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6012) Support WindowStart / WindowEnd functions in stream SQL

2017-03-09 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-6012:
-

 Summary: Support WindowStart / WindowEnd functions in stream SQL
 Key: FLINK-6012
 URL: https://issues.apache.org/jira/browse/FLINK-6012
 Project: Flink
  Issue Type: Sub-task
Reporter: Haohui Mai
Assignee: Haohui Mai


This jira proposes to add support for {{TUMBLE_START()}} / {{TUMBLE_END()}} / 
{{HOP_START()}} / {{HOP_END()}} / {{SESSUIB_START()}} / {{SESSION_END()}} in 
the planner in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL

2017-03-09 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-6011:
-

 Summary: Support TUMBLE, HOP, SESSION window in streaming SQL
 Key: FLINK-6011
 URL: https://issues.apache.org/jira/browse/FLINK-6011
 Project: Flink
  Issue Type: Sub-task
Reporter: Haohui Mai
Assignee: Haohui Mai


CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / 
{{HOP}} / {{SESSION}} windows in the parser.

This jira tracks the efforts of adding the corresponding supports on the 
planners / optimizers in Flink.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5829) Bump Calcite version to 1.12 once available

2017-03-09 Thread Haohui Mai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haohui Mai reassigned FLINK-5829:
-

Assignee: Haohui Mai

> Bump Calcite version to 1.12 once available
> ---
>
> Key: FLINK-5829
> URL: https://issues.apache.org/jira/browse/FLINK-5829
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Haohui Mai
>
> Once Calcite 1.12 is release we should update to remove some copied classes. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6010) Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section

2017-03-09 Thread Bowen Li (JIRA)
Bowen Li created FLINK-6010:
---

 Summary: Documentation: correct IntelliJ IDEA Plugins path in 
'Installing the Scala plugin' section
 Key: FLINK-6010
 URL: https://issues.apache.org/jira/browse/FLINK-6010
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.2.0
Reporter: Bowen Li
 Fix For: 1.2.1


In 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/ide_setup.html#installing-the-scala-plugin,
 how you should get to 'plugins' page in IntelliJ IDEA is wrong. This seems to 
be describing a much older version of IntelliJ IDE.

The correct path now is: IntelliJ IDEA -> Preferences -> Plugins -> Install 
JetBrains plugin 

I'll submit a PR to fix this



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6009) Deprecate DataSetUtils#checksumHashCode

2017-03-09 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-6009:
-

 Summary: Deprecate DataSetUtils#checksumHashCode
 Key: FLINK-6009
 URL: https://issues.apache.org/jira/browse/FLINK-6009
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 1.3.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Trivial
 Fix For: 1.3.0


This is likely only used by Gelly and we have a more featureful implementation 
allowing for multiple outputs and setting the job name. Deprecation will allow 
this to be removed in Flink 2.0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3502: [FLINK-4565] Support for SQL IN operator

2017-03-09 Thread DmytroShkvyra
GitHub user DmytroShkvyra opened a pull request:

https://github.com/apache/flink/pull/3502

[FLINK-4565] Support for SQL IN operator

[FLINK-4565] Support for SQL IN operator
This PR is a part of work on SQL IN operator in Table API, which implements 
IN for literals.
Two cases are covered: less and great then 20 literals.

Also I have some questions:
- converting all numeric types to BigDecimal isn't ok? I decided to make so 
to simplify use of hashset.
- validation isn't really good. It forces to use operator with same type 
literals. Should I rework it or maybe just add more cases?
expressionDsl.scala:
entry point for IN operator in scala API
ScalarOperators.scala:
1) All numeric types are upcasting to BigDecimal for using in hashset, 
other types are unchanged in castNumeric
2) valuesInitialization used for 2 cases: when we have more then 20 
operands (then we use hashset, initialized in constructor, descibed below) and 
less then 20 operands (then we initialize operands in method's body and use 
them in conjunction)
3) comparison also covers described above cases. In first case we use 
callback to declare and initialize hashset with all operands. Otherwise we just 
put all operands in conjunction.
4) Final code is built up with these code snippets.
CodeGenerator.scala:
passes arguments and callback to declare and init hashet
FunctionCatalog.scala:
registers "in" as method
InITCase:
some use cases

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3502


commit f7960d66a0f885a5b032345427c5380f268cc60e
Author: DmytroShkvyra 
Date:   2017-03-09T19:37:46Z

[FLINK-4565] Support for SQL IN operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903759#comment-15903759
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

GitHub user DmytroShkvyra opened a pull request:

https://github.com/apache/flink/pull/3502

[FLINK-4565] Support for SQL IN operator

[FLINK-4565] Support for SQL IN operator
This PR is a part of work on SQL IN operator in Table API, which implements 
IN for literals.
Two cases are covered: less and great then 20 literals.

Also I have some questions:
- converting all numeric types to BigDecimal isn't ok? I decided to make so 
to simplify use of hashset.
- validation isn't really good. It forces to use operator with same type 
literals. Should I rework it or maybe just add more cases?
expressionDsl.scala:
entry point for IN operator in scala API
ScalarOperators.scala:
1) All numeric types are upcasting to BigDecimal for using in hashset, 
other types are unchanged in castNumeric
2) valuesInitialization used for 2 cases: when we have more then 20 
operands (then we use hashset, initialized in constructor, descibed below) and 
less then 20 operands (then we initialize operands in method's body and use 
them in conjunction)
3) comparison also covers described above cases. In first case we use 
callback to declare and initialize hashset with all operands. Otherwise we just 
put all operands in conjunction.
4) Final code is built up with these code snippets.
CodeGenerator.scala:
passes arguments and callback to declare and init hashet
FunctionCatalog.scala:
registers "in" as method
InITCase:
some use cases

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3502


commit f7960d66a0f885a5b032345427c5380f268cc60e
Author: DmytroShkvyra 
Date:   2017-03-09T19:37:46Z

[FLINK-4565] Support for SQL IN operator




> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dmytro Shkvyra
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-09 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r105244663
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

@twalthr Unfortunately, add support for POJO and tuples as a standard types 
of data in table API is too complicated task and out of scope this task. Adding 
of new composite types to API more wide than add IN operator, it will impact 
all process of validation and execution of SQL statements. It would be better 
create epic for introduce new types.
I have researched it and I think I would be better commit IN operator 
functionality without support tuples and POJOs. Otherwise, It would become 
infinite task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903654#comment-15903654
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r105244663
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

@twalthr Unfortunately, add support for POJO and tuples as a standard types 
of data in table API is too complicated task and out of scope this task. Adding 
of new composite types to API more wide than add IN operator, it will impact 
all process of validation and execution of SQL statements. It would be better 
create epic for introduce new types.
I have researched it and I think I would be better commit IN operator 
functionality without support tuples and POJOs. Otherwise, It would become 
infinite task.


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dmytro Shkvyra
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3501: [FLINK-5874] Restrict key types in the DataStream API.

2017-03-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3501
  
This should work and is well tested, good job. Had a bunch of minor 
comments, but nothing critical.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903645#comment-15903645
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3501
  
This should work and is well tested, good job. Had a bunch of minor 
comments, but nothing critical.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903632#comment-15903632
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105240510
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
+* 
+* it is a POJO type but does not override the {@link 
#hashCode()} method and relies on
+* the {@link Object#hashCode()} implementation.
+* it is an array of any type (see {@link 
PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+* {@link ObjectArrayTypeInfo}).
+* ,
+* {@code true} otherwise.
+*/
+   private boolean validateKeyTypeIsHashable(TypeInformation type) {
+   try {
+   return (type instanceof PojoTypeInfo) ?
--- End diff --

we should also simply break this if statement down into multiple blocks.

i.e.

```
if (type isntanceof PojoTypeInfo) {
return //find hashCode
}

if (tpye instance off XArrayTypeInfo ... ) {
 return false;
}

return true;
```


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903631#comment-15903631
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241140
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
--- End diff --

Do we need a separate class here or could we just throw in an Object[]?


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903635#comment-15903635
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241450
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElemen

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903628#comment-15903628
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105239060
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
--- End diff --

First time I'm seeing this pattern and i can't help but feel that a simple 
try-catch block would be more readable.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903633#comment-15903633
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105239223
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 ---
@@ -736,7 +736,7 @@ public void restoreState(List state) throws 
Exception {
 
static final ValueStateDescriptor descriptor = new 
ValueStateDescriptor<>("seen", Boolean.class, false);
private static final long serialVersionUID = 1L;
-   private ValueState operatorState;
+   private transient ValueState operatorState;
--- End diff --

unrelated change.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903629#comment-15903629
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241562
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElemen

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903624#comment-15903624
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105239687
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
--- End diff --

Let's include ```typeInfo``` in this exception as well to narrow it down 
for the user. We may even want to delay the exception until we scanned the 
entire type and report all invalid keys at once.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903637#comment-15903637
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105242176
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElemen

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903638#comment-15903638
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105239799
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
+* 
+* it is a POJO type but does not override the {@link 
#hashCode()} method and relies on
+* the {@link Object#hashCode()} implementation.
+* it is an array of any type (see {@link 
PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+* {@link ObjectArrayTypeInfo}).
+* ,
+* {@code true} otherwise.
+*/
+   private boolean validateKeyTypeIsHashable(TypeInformation type) {
+   try {
+   return (type instanceof PojoTypeInfo) ?
--- End diff --

? and : are typically at the start of a new line.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903634#comment-15903634
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241262
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElemen

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903630#comment-15903630
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241358
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElemen

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903626#comment-15903626
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105238457
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElemen

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903636#comment-15903636
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105240936
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElemen

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903639#comment-15903639
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105238197
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
--- End diff --

type: Arround -> Around


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> --

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903625#comment-15903625
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105240608
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
+* 
+* it is a POJO type but does not override the {@link 
#hashCode()} method and relies on
+* the {@link Object#hashCode()} implementation.
+* it is an array of any type (see {@link 
PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+* {@link ObjectArrayTypeInfo}).
+* ,
+* {@code true} otherwise.
+*/
+   private boolean validateKeyTypeIsHashable(TypeInformation type) {
+   try {
+   return (type instanceof PojoTypeInfo) ?
+   
!type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class)
 :
--- End diff --

this method would be more readable by not inverting here, and returning 
true as the default.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903627#comment-15903627
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105240820
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
--- End diff --

I would shorten this to read ```returns true if the type overrides the 
hashcode implementation```. The details can be container in the general javadoc 
of the method.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241358
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   input.keyBy(new KeySelector() {
+   @Override
+   public POJOwithHashCode getKey(POJOwithHashCode value) 
throws Exce

[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105239687
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
--- End diff --

Let's include ```typeInfo``` in this exception as well to narrow it down 
for the user. We may even want to delay the exception until we scanned the 
entire type and report all invalid keys at once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241262
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   input.keyBy(new KeySelector() {
+   @Override
+   public POJOwithHashCode getKey(POJOwithHashCode value) 
throws Exce

[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105238457
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   input.keyBy(new KeySelector() {
+   @Override
+   public POJOwithHashCode getKey(POJOwithHashCode value) 
throws Exce

[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241450
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   input.keyBy(new KeySelector() {
+   @Override
+   public POJOwithHashCode getKey(POJOwithHashCode value) 
throws Exce

[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105239060
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
--- End diff --

First time I'm seeing this pattern and i can't help but feel that a simple 
try-catch block would be more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105239799
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
+* 
+* it is a POJO type but does not override the {@link 
#hashCode()} method and relies on
+* the {@link Object#hashCode()} implementation.
+* it is an array of any type (see {@link 
PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+* {@link ObjectArrayTypeInfo}).
+* ,
+* {@code true} otherwise.
+*/
+   private boolean validateKeyTypeIsHashable(TypeInformation type) {
+   try {
+   return (type instanceof PojoTypeInfo) ?
--- End diff --

? and : are typically at the start of a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105240820
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
--- End diff --

I would shorten this to read ```returns true if the type overrides the 
hashcode implementation```. The details can be container in the general javadoc 
of the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105242176
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
--- End diff --

The focus of this test appears to be to verify that you can use an array as 
a key by wrapping it in a Pojo that implements HashCode. (based on the naming).

We should probably focus more on the Pojo-with-hashCode i

[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105240608
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, 
KeySelector keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
-   this.keyType = keyType;
+   this.keyType = validateKeyType(keyType);
}
-   
+
+   private TypeInformation validateKeyType(TypeInformation 
keyType) {
+   Stack> stack = new Stack<>();
+   stack.push(keyType);
+
+   while (!stack.isEmpty()) {
+   TypeInformation typeInfo = stack.pop();
+
+   if (!validateKeyTypeIsHashable(typeInfo)) {
+   throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
+   }
+   
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   for (int i = 0; i < typeInfo.getArity(); i++) {
+   stack.push(((TupleTypeInfoBase) 
typeInfo).getTypeAt(i));
+   }
+   }
+   }
+   return keyType;
+   }
+
+   /**
+* Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
+* used as a key in the {@code DataStream.keyBy()} operation.
+*
+* @return {@code false} if:
+* 
+* it is a POJO type but does not override the {@link 
#hashCode()} method and relies on
+* the {@link Object#hashCode()} implementation.
+* it is an array of any type (see {@link 
PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+* {@link ObjectArrayTypeInfo}).
+* ,
+* {@code true} otherwise.
+*/
+   private boolean validateKeyTypeIsHashable(TypeInformation type) {
+   try {
+   return (type instanceof PojoTypeInfo) ?
+   
!type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class)
 :
--- End diff --

this method would be more readable by not inverting here, and returning 
true as the default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105241562
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   input.keyBy(new KeySelector() {
+   @Override
+   public POJOwithHashCode getKey(POJOwithHashCode value) 
throws Exce

[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105240936
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, TestClass[]> keySelector 
=
+   new KeySelector, 
TestClass[]>() {
+
+   @Override
+   public TestClass[] 
getKey(Tuple2 value) throws Exception {
+   TestClass[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
+
+   input.keyBy(keySelector);
+   }
+
+   // composite key tests : POJOs
+
+   @Test
+   public void testPOJONestedArrayKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testNestedArrayWorkArround() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOwithHashCode(new int[] {1, 2}));
+
+   input.keyBy(new KeySelector() {
+   @Override
+   public POJOwithHashCode getKey(POJOwithHashCode value) 
throws Exce

  1   2   3   4   >