[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904650#comment-15904650 ] ASF GitHub Bot commented on FLINK-4565: --- Github user DmytroShkvyra closed the pull request at: https://github.com/apache/flink/pull/3502 > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904651#comment-15904651 ] ASF GitHub Bot commented on FLINK-4565: --- GitHub user DmytroShkvyra reopened a pull request: https://github.com/apache/flink/pull/3502 [FLINK-4565] Support for SQL IN operator [FLINK-4565] Support for SQL IN operator This PR is a part of work on SQL IN operator in Table API, which implements IN for literals. Two cases are covered: less and great then 20 literals. Also I have some questions: - converting all numeric types to BigDecimal isn't ok? I decided to make so to simplify use of hashset. - validation isn't really good. It forces to use operator with same type literals. Should I rework it or maybe just add more cases? expressionDsl.scala: entry point for IN operator in scala API ScalarOperators.scala: 1) All numeric types are upcasting to BigDecimal for using in hashset, other types are unchanged in castNumeric 2) valuesInitialization used for 2 cases: when we have more then 20 operands (then we use hashset, initialized in constructor, descibed below) and less then 20 operands (then we initialize operands in method's body and use them in conjunction) 3) comparison also covers described above cases. In first case we use callback to declare and initialize hashset with all operands. Otherwise we just put all operands in conjunction. 4) Final code is built up with these code snippets. CodeGenerator.scala: passes arguments and callback to declare and init hashet FunctionCatalog.scala: registers "in" as method InITCase: some use cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3502 commit f7960d66a0f885a5b032345427c5380f268cc60e Author: DmytroShkvyra Date: 2017-03-09T19:37:46Z [FLINK-4565] Support for SQL IN operator > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)
[ https://issues.apache.org/jira/browse/FLINK-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison closed FLINK-6015. - Resolution: Fixed Fixed in FLINK-5907 > "Row too short" when reading CSV line with empty last field (i.e. ending in > comma) > -- > > Key: FLINK-6015 > URL: https://issues.apache.org/jira/browse/FLINK-6015 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > When using env.readCsvFile(filename), if a line in the CSV file has an empty > last field, the line ends with a comma. This triggers an exception in > GenericCsvInput.parseRecord(): > // check valid start position > if (startPos >= limit) { > if (lenient) { > return false; > } else { > throw new ParseException("Row too > short: " + new String(bytes, offset, numBytes)); > } > } > Setting the parser to lenient would cause the last field to be left as null, > rather than setting its value to "". > The parser should accept empty values for the last field on a row. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)
[ https://issues.apache.org/jira/browse/FLINK-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904615#comment-15904615 ] Luke Hutchison commented on FLINK-6015: --- Yes, this looks like the same bug. Thanks. > "Row too short" when reading CSV line with empty last field (i.e. ending in > comma) > -- > > Key: FLINK-6015 > URL: https://issues.apache.org/jira/browse/FLINK-6015 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > When using env.readCsvFile(filename), if a line in the CSV file has an empty > last field, the line ends with a comma. This triggers an exception in > GenericCsvInput.parseRecord(): > // check valid start position > if (startPos >= limit) { > if (lenient) { > return false; > } else { > throw new ParseException("Row too > short: " + new String(bytes, offset, numBytes)); > } > } > Setting the parser to lenient would cause the last field to be left as null, > rather than setting its value to "". > The parser should accept empty values for the last field on a row. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904580#comment-15904580 ] ASF GitHub Bot commented on FLINK-5658: --- Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105337481 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- Yes, there may be some memory problem if the watermark interval is too long, but if use statebackend, the cost of serialize whole list everytime when update is too large, like you say, if there can be a sorting statebackend that can provide quikly search and update, that's great for me. > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105337481 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- Yes, there may be some memory problem if the watermark interval is too long, but if use statebackend, the cost of serialize whole list everytime when update is too large, like you say, if there can be a sorting statebackend that can provide quikly search and update, that's great for me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3505: [backport-1.2] [FLINK-6006] [kafka] Always use com...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3505 [backport-1.2] [FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer (This PR is the fix of FLINK-6006 for Flink 1.2) Previously, the Kafka Consumer performs partition list querying on restore, and then uses it to filter out restored state of partitions that doesn't exist in the list. If in any case the returned partitions list is incomplete (i.e. missing partitions that existed before perhaps due to temporary ZK / broker downtimes), then the state of the missing partitions is dropped and cannot be recovered anymore. This PR fixes this by always restoring the complete state, without any sort of filtering. We simply let the consumer fail if assigned partitions to the consuming threads / Kafka clients are unreachable when the consumer starts running. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6006 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3505.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3505 commit d38e3fd4b123cacf4fd6ee3c1baab77e9b8593a0 Author: Tzu-Li (Gordon) Tai Date: 2017-03-10T06:10:56Z [FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer Previously, the Kafka Consumer performs partition list querying on restore, and then uses it to filter out restored state of partitions that doesn't exist in the list. If in any case the returned partitions list is incomplete (i.e. missing partitions that existed before perhaps due to temporary ZK / broker downtimes), then the state of the missing partitions is dropped and cannot be recovered anymore. This commit fixes this by always restoring the complete state, without any sort of filtering. We simply let the consumer fail if assigned partitions to the consuming threads / Kafka clients are unreachable when the consumer starts running. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)
[ https://issues.apache.org/jira/browse/FLINK-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904496#comment-15904496 ] Kurt Young commented on FLINK-6015: --- HI, i think this issue has been addressed by https://issues.apache.org/jira/browse/FLINK-5907 > "Row too short" when reading CSV line with empty last field (i.e. ending in > comma) > -- > > Key: FLINK-6015 > URL: https://issues.apache.org/jira/browse/FLINK-6015 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > When using env.readCsvFile(filename), if a line in the CSV file has an empty > last field, the line ends with a comma. This triggers an exception in > GenericCsvInput.parseRecord(): > // check valid start position > if (startPos >= limit) { > if (lenient) { > return false; > } else { > throw new ParseException("Row too > short: " + new String(bytes, offset, numBytes)); > } > } > Setting the parser to lenient would cause the last field to be left as null, > rather than setting its value to "". > The parser should accept empty values for the last field on a row. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6018: --- Description: The code snippet currently in the `AbstractKeyedStateBackend # getPartitionedState` method, as follows: {code} line 352: // TODO: This is wrong, it should throw an exception that the initialization has not properly happened line 353: if (!stateDescriptor.isSerializerInitialized()) { line 354:stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); line 354 } {code} Method `isSerializerInitialized`: {code} public boolean isSerializerInitialized() { return serializer != null; } {code} Method `initializeSerializerUnlessSet`: {code} public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { if (serializer == null) { if (typeInfo != null) { serializer = typeInfo.createSerializer(executionConfig); } else { throw new IllegalStateException( "Cannot initialize serializer after TypeInformation was dropped during serialization"); } } } {code} that is, in the `initializeSerializerUnlessSet` method, The `serializer` has been checked by `serializer == null`.So I hope this code has a little improvement to the following: approach 1: According to the `TODO` information we throw an exception {code} if (!stateDescriptor.isSerializerInitialized()) { throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); } {code} approach 2: Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) {` logic. {code} stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); {code} Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` add a `private final ExecutionConfig executionConfig` property. then we can change the code like this: {code} stateDescriptor.initializeSerializerUnlessSet(executionConfig); {code} Are the above suggestions reasonable for you? Welcome anybody's feedback and corrections. was: The code snippet currently in the `AbstractKeyedStateBackend # getPartitionedState` method, as follows: {code} line 352: // TODO: This is wrong, it should throw an exception that the initialization has not properly happened line 353: if (!stateDescriptor.isSerializerInitialized()) { line 354:stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); line 354 } {code} I hope this code has a little improvement to the following: approach 1: According to the `TODO` information we throw an exception {code} if (!stateDescriptor.isSerializerInitialized()) { throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); } {code} approach 2: Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) {` logic. {code} stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); {code} Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` add a `private final ExecutionConfig executionConfig` property. then we can change the code like this: {code} stateDescriptor.initializeSerializerUnlessSet(executionConfig); {code} Are the above suggestions reasonable for you? Welcome anybody's feedback and corrections. > Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` > method > --- > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { >
[jira] [Updated] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6018: --- Description: The code snippet currently in the `AbstractKeyedStateBackend # getPartitionedState` method, as follows: {code} line 352: // TODO: This is wrong, it should throw an exception that the initialization has not properly happened line 353: if (!stateDescriptor.isSerializerInitialized()) { line 354:stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); line 354 } {code} I hope this code has a little improvement to the following: approach 1: According to the `TODO` information we throw an exception {code} if (!stateDescriptor.isSerializerInitialized()) { throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); } {code} approach 2: Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) {` logic. {code} stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); {code} Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` add a `private final ExecutionConfig executionConfig` property. then we can change the code like this: {code} stateDescriptor.initializeSerializerUnlessSet(executionConfig); {code} Are the above suggestions reasonable for you? Welcome anybody's feedback and corrections. was: The code snippet currently in the `AbstractKeyedStateBackend # getPartitionedState` method, as follows: {code} // TODO: This is wrong, it should throw an exception that the initialization has not properly happened if (!stateDescriptor.isSerializerInitialized()) { stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); } {code} I hope this code has a little improvement to the following: approach 1: According to the `TODO` information we throw an exception {code} if (!stateDescriptor.isSerializerInitialized()) { throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); } {code} approach 2: Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) {` logic. {code} stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); {code} Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` add a `private final ExecutionConfig executionConfig` property. then we can change the code like this: {code} stateDescriptor.initializeSerializerUnlessSet(executionConfig); {code} Are the above suggestions reasonable for you? Welcome anybody's feedback and corrections. > Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` > method > --- > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > I hope this code has a little improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method
sunjincheng created FLINK-6018: -- Summary: Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method Key: FLINK-6018 URL: https://issues.apache.org/jira/browse/FLINK-6018 Project: Flink Issue Type: Improvement Components: DataStream API, State Backends, Checkpointing Reporter: sunjincheng Assignee: sunjincheng The code snippet currently in the `AbstractKeyedStateBackend # getPartitionedState` method, as follows: {code} // TODO: This is wrong, it should throw an exception that the initialization has not properly happened if (!stateDescriptor.isSerializerInitialized()) { stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); } {code} I hope this code has a little improvement to the following: approach 1: According to the `TODO` information we throw an exception {code} if (!stateDescriptor.isSerializerInitialized()) { throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); } {code} approach 2: Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) {` logic. {code} stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); {code} Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` add a `private final ExecutionConfig executionConfig` property. then we can change the code like this: {code} stateDescriptor.initializeSerializerUnlessSet(executionConfig); {code} Are the above suggestions reasonable for you? Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6017) CSV reader does not support quoted double quotes
Luke Hutchison created FLINK-6017: - Summary: CSV reader does not support quoted double quotes Key: FLINK-6017 URL: https://issues.apache.org/jira/browse/FLINK-6017 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Environment: Linux Reporter: Luke Hutchison The RFC for the CSV format specifies that double quotes are valid in quoted strings in CSV, by doubling the quote character: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing quoted quotes, such as: bob,"The name is ""Bob""" you get this exception: org.apache.flink.api.common.io.ParseException: Line could not be parsed: 'bob,"The name is ""Bob"""' ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3504: [FLINK-6010] Documentation: correct IntelliJ IDEA ...
GitHub user phoenixjiangnan opened a pull request: https://github.com/apache/flink/pull/3504 [FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 'In⦠â¦stalling the Scala plugin' section You can merge this pull request into a Git repository by running: $ git pull https://github.com/phoenixjiangnan/flink FLINK-6010 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3504.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3504 commit 0aa3dccdcd80c5c58161296718974f695cdea2c9 Author: Bowen Li Date: 2017-03-09T21:55:07Z [FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6010) Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section
[ https://issues.apache.org/jira/browse/FLINK-6010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904413#comment-15904413 ] ASF GitHub Bot commented on FLINK-6010: --- GitHub user phoenixjiangnan opened a pull request: https://github.com/apache/flink/pull/3504 [FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 'In… …stalling the Scala plugin' section You can merge this pull request into a Git repository by running: $ git pull https://github.com/phoenixjiangnan/flink FLINK-6010 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3504.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3504 commit 0aa3dccdcd80c5c58161296718974f695cdea2c9 Author: Bowen Li Date: 2017-03-09T21:55:07Z [FLINK-6010] Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section > Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala > plugin' section > -- > > Key: FLINK-6010 > URL: https://issues.apache.org/jira/browse/FLINK-6010 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Bowen Li > Fix For: 1.2.1 > > > In > https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/ide_setup.html#installing-the-scala-plugin, > how you should get to 'plugins' page in IntelliJ IDEA is wrong. This seems > to be describing a much older version of IntelliJ IDE. > The correct path now is: IntelliJ IDEA -> Preferences -> Plugins -> Install > JetBrains plugin > I'll submit a PR to fix this -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904396#comment-15904396 ] sunjincheng commented on FLINK-5995: HI, [~srichter] [~aljoscha] Thanks for your suggestions. I made a simple implementation, and opened the PR, I would be grateful if you could review. Fell free to tell me If there is anything Incorrect. Best, SunJincheng > Get a Exception when creating the ListStateDescriptor with a TypeInformation > - > > Key: FLINK-5995 > URL: https://issues.apache.org/jira/browse/FLINK-5995 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > > When use OperatorState and creating the ListStateDescriptor with a > TypeInformation,I got a exception. The Exception info is: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Serializer not yet initialized. > at > org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169) > at > org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91) > at > org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670) > at java.lang.Thread.run(Thread.java:745) > {code} > So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the > `getOperatorState` method. I appreciate If anyone can give me some advice? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904386#comment-15904386 ] ASF GitHub Bot commented on FLINK-5995: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3503 [FLINK-5995][checkpoints] fix Get a Exception when creating the ListS… …tateDescriptor with a TypeInformation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [×] General - The pull request references the related JIRA issue ("[FLINK-5995] Fix Get a Exception when creating the ListStateDescriptor with a TypeInformation") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5995-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3503.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3503 > Get a Exception when creating the ListStateDescriptor with a TypeInformation > - > > Key: FLINK-5995 > URL: https://issues.apache.org/jira/browse/FLINK-5995 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > > When use OperatorState and creating the ListStateDescriptor with a > TypeInformation,I got a exception. The Exception info is: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Serializer not yet initialized. > at > org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169) > at > org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91) > at > org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.Abstr
[GitHub] flink pull request #3503: [FLINK-5995][checkpoints] fix Get a Exception when...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3503 [FLINK-5995][checkpoints] fix Get a Exception when creating the ListS⦠â¦tateDescriptor with a TypeInformation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [Ã] General - The pull request references the related JIRA issue ("[FLINK-5995] Fix Get a Exception when creating the ListStateDescriptor with a TypeInformation") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5995-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3503.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3503 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6016) Newlines should be valid in quoted strings in CSV
Luke Hutchison created FLINK-6016: - Summary: Newlines should be valid in quoted strings in CSV Key: FLINK-6016 URL: https://issues.apache.org/jira/browse/FLINK-6016 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Affects Versions: 1.2.0 Reporter: Luke Hutchison The RFC for the CSV format specifies that newlines are valid in quoted strings in CSV: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing a newline, such as: "3 4",5 you get this exception: Line could not be parsed: '"3' ParserError UNTERMINATED_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904367#comment-15904367 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323083 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected1 = mutable.MutableList( + "1,1,1", "2,2,2", "3,2,5") +val expected2 = mutable.MutableList( + "1,1,1", "2,2,5", "3,2,3") +assertTrue(expected1.equals(StreamITCase.testResults.sorted) || + expected2.equals(StreamITCase.testResults.sorted)) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSi
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904363#comment-15904363 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323567 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( --- End diff -- I suggest that change `Eventtime` to `EventTime`, What do you think? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Tra
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904370#comment-15904370 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105324505 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeovers
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904365#comment-15904365 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322751 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + --- End diff -- I suggest that partitionBy `c` filed. Just a suggestion. > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904366#comment-15904366 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323410 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -80,6 +81,38 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventtimeOverProcessFunction( --- End diff -- I suggest that change `Eventtime` to `EventTime`, What do you think? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904371#comment-15904371 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322914 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected1 = mutable.MutableList( + "1,1,1", "2,2,2", "3,2,5") +val expected2 = mutable.MutableList( + "1,1,1", "2,2,5", "3,2,3") +assertTrue(expected1.equals(StreamITCase.testResults.sorted) || + expected2.equals(StreamITCase.testResults.sorted)) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSi
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904369#comment-15904369 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322654 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) --- End diff -- This test data is a bit simple, I recommend enriching some test data, such as: ``` data.+=((1, 1L, "Hi")) data.+=((2, 2L, "Hello")) data.+=((3, 5L, "Hello")) data.+=((1, 3L, "Hello")) data.+=((3, 7L, "Hello world")) data.+=((4, 9L, "Hello world")) data.+=((5, 8L, "Hello world")) ``` what do you think? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataSt
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904368#comment-15904368 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105325003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- It's not a good idea to use a memory data structure here because i worry about OOM problems in big data situations, I suggest use stateBackend. Unfortunately we are not currently sorting stateBackend, maybe we can think about other ways. I'm not sure, but I need to think about it and then give you feedback. what do you think? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Huesk
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904364#comment-15904364 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322866 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) --- End diff -- Why use a fixed value to produce watermark, can generate watermark based on data? > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProj
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323567 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( --- End diff -- I suggest that change `Eventtime` to `EventTime`, What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322751 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + --- End diff -- I suggest that partitionBy `c` filed. Just a suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322914 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected1 = mutable.MutableList( + "1,1,1", "2,2,2", "3,2,5") +val expected2 = mutable.MutableList( + "1,1,1", "2,2,5", "3,2,3") +assertTrue(expected1.equals(StreamITCase.testResults.sorted) || + expected2.equals(StreamITCase.testResults.sorted)) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +assertEquals(Some("6"), StreamITCase.testResults.sorted.get(StreamITCase.testResults.size - 1)) --- End diff -- Can you test the results of each output? --- If your project is set up for it, you can
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322654 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) --- End diff -- This test data is a bit simple, I recommend enriching some test data, such as: ``` data.+=((1, 1L, "Hi")) data.+=((2, 2L, "Hello")) data.+=((3, 5L, "Hello")) data.+=((1, 3L, "Hello")) data.+=((3, 7L, "Hello world")) data.+=((4, 9L, "Hello world")) data.+=((5, 8L, "Hello world")) ``` what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105324505 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def proce
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322866 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) --- End diff -- Why use a fixed value to produce watermark, can generate watermark based on data? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105325003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- It's not a good idea to use a memory data structure here because i worry about OOM problems in big data situations, I suggest use stateBackend. Unfortunately we are not currently sorting stateBackend, maybe we can think about other ways. I'm not sure, but I need to think about it and then give you feedback. what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323083 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected1 = mutable.MutableList( + "1,1,1", "2,2,2", "3,2,5") +val expected2 = mutable.MutableList( + "1,1,1", "2,2,5", "3,2,3") +assertTrue(expected1.equals(StreamITCase.testResults.sorted) || + expected2.equals(StreamITCase.testResults.sorted)) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +assertEquals(Some("6"), StreamITCase.testResults.sorted.get(StreamITCase.testResults.size - 1)) + } + + /** test sliding event-time unbounded window with later record **/ + @Test + def test
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323410 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -80,6 +81,38 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventtimeOverProcessFunction( --- End diff -- I suggest that change `Eventtime` to `EventTime`, What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)
Luke Hutchison created FLINK-6015: - Summary: "Row too short" when reading CSV line with empty last field (i.e. ending in comma) Key: FLINK-6015 URL: https://issues.apache.org/jira/browse/FLINK-6015 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Affects Versions: 1.2.0 Environment: Linux Reporter: Luke Hutchison When using env.readCsvFile(filename), if a line in the CSV file has an empty last field, the line ends with a comma. This triggers an exception in GenericCsvInput.parseRecord(): // check valid start position if (startPos >= limit) { if (lenient) { return false; } else { throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); } } Setting the parser to lenient would cause the last field to be left as null, rather than setting its value to "". The parser should accept empty values for the last field on a row. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5861) TaskManager's components support updating JobManagerConnection
[ https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904310#comment-15904310 ] ASF GitHub Bot commented on FLINK-5861: --- Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3395 Hi Till, I almost miss this comments! I didn't see it until a few minutes ago. I fully understand your concern. Just let me explain more about your comments. 1. I agree most of your suggestions. Such as null check, formatting problem and TestLogger. 2. Currently synchronize problem will not happen. I think replacing the field value is safe. That's a atomic operation. Correct me if I'm wrong. 3. This PR will not work with other reconciliation PRs. Nobody will notify these listeners. Actually we implemented reconnection between TM and JM. It will work with those codes. The reason I make this PR without other reconciliation PRs is that I think this PR is independent with other parts. I believe filing a huge PR is both terrible for reviewer and writer. However this single PR makes you confused. Sorry about that. 4. Actually I'm not sure listener pattern is the best way to do this. But I think it's the simplest way which makes least modifications of current implementation. If the TM reconnected with new JM, how can we update the JobMasterGateway handled by components? I can't figure out a better way except reimplementing these components. Anyway, thank you for reviewing and commenting so many! I agree with you that we should close this PR at this moment. After making an agreement about main reconciliation PRs, we can talk about what this PR try to implement. > TaskManager's components support updating JobManagerConnection > -- > > Key: FLINK-5861 > URL: https://issues.apache.org/jira/browse/FLINK-5861 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Biao Liu >Assignee: Biao Liu > Fix For: 1.3.0 > > > Some components in TaskManager, such as TaskManagerActions, > CheckpointResponder, ResultPartitionConsumableNotifier, > PartitionProducerStateChecker, need to support updating JobManagerConnection. > So when JobManager fails and recovers, the tasks who keep old > JobManagerConnection can be notified to update JobManagerConnection. The > tasks can continue doing their jobs without failure. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5861) TaskManager's components support updating JobManagerConnection
[ https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904311#comment-15904311 ] ASF GitHub Bot commented on FLINK-5861: --- Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/3395 > TaskManager's components support updating JobManagerConnection > -- > > Key: FLINK-5861 > URL: https://issues.apache.org/jira/browse/FLINK-5861 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Biao Liu >Assignee: Biao Liu > Fix For: 1.3.0 > > > Some components in TaskManager, such as TaskManagerActions, > CheckpointResponder, ResultPartitionConsumableNotifier, > PartitionProducerStateChecker, need to support updating JobManagerConnection. > So when JobManager fails and recovers, the tasks who keep old > JobManagerConnection can be notified to update JobManagerConnection. The > tasks can continue doing their jobs without failure. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3395: [FLINK-5861] Components of TaskManager support updating J...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3395 Hi Till, I almost miss this comments! I didn't see it until a few minutes ago. I fully understand your concern. Just let me explain more about your comments. 1. I agree most of your suggestions. Such as null check, formatting problem and TestLogger. 2. Currently synchronize problem will not happen. I think replacing the field value is safe. That's a atomic operation. Correct me if I'm wrong. 3. This PR will not work with other reconciliation PRs. Nobody will notify these listeners. Actually we implemented reconnection between TM and JM. It will work with those codes. The reason I make this PR without other reconciliation PRs is that I think this PR is independent with other parts. I believe filing a huge PR is both terrible for reviewer and writer. However this single PR makes you confused. Sorry about that. 4. Actually I'm not sure listener pattern is the best way to do this. But I think it's the simplest way which makes least modifications of current implementation. If the TM reconnected with new JM, how can we update the JobMasterGateway handled by components? I can't figure out a better way except reimplementing these components. Anyway, thank you for reviewing and commenting so many! I agree with you that we should close this PR at this moment. After making an agreement about main reconciliation PRs, we can talk about what this PR try to implement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...
Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/3395 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reassigned FLINK-3849: - Assignee: Kurt Young > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests
Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 @StephanEwen FlatMapFunction "Tokenizer" has move to org.apache.flink.test.testfunctions. Help me to review this. Thanks. And collect other reusable functions to org.apache.flink.test.testfunctions later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test
[ https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904280#comment-15904280 ] ASF GitHub Bot commented on FLINK-5976: --- Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 @StephanEwen FlatMapFunction "Tokenizer" has move to org.apache.flink.test.testfunctions. Help me to review this. Thanks. And collect other reusable functions to org.apache.flink.test.testfunctions later. > Refactoring duplicate Tokenizer in flink-test > - > > Key: FLINK-5976 > URL: https://issues.apache.org/jira/browse/FLINK-5976 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 1.2.0 >Reporter: liuyuzhong7 >Priority: Minor > Labels: test > Fix For: 1.2.0 > > > There are some duplicate code like this in flink-test, I think refactor this > will be better. > ``` > public final class Tokenizer implements FlatMapFunction Tuple2> { > @Override > public void flatMap(String value, Collector> > out) { > // normalize and split the line > String[] tokens = value.toLowerCase().split("\\W+"); > // emit the pairs > for (String token : tokens) { > if (token.length() > 0) { > out.collect(new Tuple2(token, > 1)); > } > } > } > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904271#comment-15904271 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], i can continue working on {{FilterableTableSource}}. Should i open another PR based on the former changes? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6014) Allow the registration of state objects in checkpoints
Xiaogang Shi created FLINK-6014: --- Summary: Allow the registration of state objects in checkpoints Key: FLINK-6014 URL: https://issues.apache.org/jira/browse/FLINK-6014 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Xiaogang Shi Assignee: Xiaogang Shi This issue is the very first step towards incremental checkpointing. We introduce a new state handle named {{CompositeStateHandle}} to be the base of the snapshots taken by task components. Known implementation may include {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s). Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. It should register all its state objects in {{StateRegistry}} when its checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending checkpoint completes or a complete checkpoint is reloaded in the recovery). When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, we should not simply discard all state objects in the checkpoint. With the introduction of incremental checkpointing, a {{StateObject}} may be referenced by different checkpoints. We should unregister all the state objects contained in the {{StateRegistry}} first. Only those state objects that are not referenced by any checkpoint can be deleted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test
[ https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904258#comment-15904258 ] ASF GitHub Bot commented on FLINK-5976: --- Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 OK, get it. So how about just do it now in flink-tests like this pull request? > Refactoring duplicate Tokenizer in flink-test > - > > Key: FLINK-5976 > URL: https://issues.apache.org/jira/browse/FLINK-5976 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 1.2.0 >Reporter: liuyuzhong7 >Priority: Minor > Labels: test > Fix For: 1.2.0 > > > There are some duplicate code like this in flink-test, I think refactor this > will be better. > ``` > public final class Tokenizer implements FlatMapFunction Tuple2> { > @Override > public void flatMap(String value, Collector> > out) { > // normalize and split the line > String[] tokens = value.toLowerCase().split("\\W+"); > // emit the pairs > for (String token : tokens) { > if (token.length() > 0) { > out.collect(new Tuple2(token, > 1)); > } > } > } > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests
Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 OK, get it. So how about just do it now in flink-tests like this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3406 Hi @fhueske , i like your propose about moving the annotation from `TableSource` to `TableSourceConverter`. Lets do it this way. BTW, i noticed that you offered three possible methods to the `TableSourceConverter`, i can only imagine `def requiredProperties: Array[String] ` is necessary for now. It can help validating the converter and to decide which converter we should use when multiple converters have the same `TableType`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105316858 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.Public; +import org.apache.flink.table.catalog.TableSourceConverter; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A tableSource with this annotation represents it is compatible with external catalog, that is, + * an instance of this tableSource can be converted to or converted from external catalog table + * instance. + * The annotation contains the following information: + * + * external catalog table type name for this kind of tableSource + * external catalog table <-> tableSource converter class + * + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Public +public @interface ExternalCatalogCompatible { --- End diff -- This suggestion is pretty good, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6013) Add Datadog HTTP metrics reporter
Bowen Li created FLINK-6013: --- Summary: Add Datadog HTTP metrics reporter Key: FLINK-6013 URL: https://issues.apache.org/jira/browse/FLINK-6013 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.2.0 Reporter: Bowen Li Priority: Critical Fix For: 1.2.1 We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a lot other companies also do. Flink right now only has a StatsD metrics reporter, and users have to set up Datadog Agent in order to receive metrics from StatsD and transport them to Datadog. We don't like this approach. We prefer to have a Datadog metrics reporter directly contacting Datadog http endpoint. I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3461: [FLINK-5954] Always assign names to the window in the Str...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3461 Hi, @haohui Thanks for your explanation, and I'll see the detail in the FLINK-5954. Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904189#comment-15904189 ] ASF GitHub Bot commented on FLINK-5954: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3461 Hi, @haohui Thanks for your explanation, and I'll see the detail in the FLINK-5954. Thanks, SunJincheng > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904089#comment-15904089 ] ASF GitHub Bot commented on FLINK-3257: --- Github user addisonj commented on the issue: https://github.com/apache/flink/pull/1668 Very interested in this work. It sounds like there are few loose ends and then some cleanup before it might be ready for merge, @senorcarbone or @StephanEwen anything that can be supported by someone else? Would love to help wherever possible > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user addisonj commented on the issue: https://github.com/apache/flink/pull/1668 Very interested in this work. It sounds like there are few loose ends and then some cleanup before it might be ready for merge, @senorcarbone or @StephanEwen anything that can be supported by someone else? Would love to help wherever possible --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3461: [FLINK-5954] Always assign names to the window in ...
GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3461 [FLINK-5954] Always assign names to the window in the Stream SQL API. Please see jira for more details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5954 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3461.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3461 commit 064b827127b15a1397c216aae6611d575a75556b Author: Haohui Mai Date: 2017-03-09T21:57:49Z [FLINK-5954] Always assign names to the window in the Stream SQL API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904050#comment-15904050 ] ASF GitHub Bot commented on FLINK-5954: --- GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3461 [FLINK-5954] Always assign names to the window in the Stream SQL API. Please see jira for more details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5954 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3461.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3461 commit 064b827127b15a1397c216aae6611d575a75556b Author: Haohui Mai Date: 2017-03-09T21:57:49Z [FLINK-5954] Always assign names to the window in the Stream SQL API. > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904048#comment-15904048 ] ASF GitHub Bot commented on FLINK-5954: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3461 Sorry for the late response. This is a prerequisite step for FLINK-6012 -- translating the group auxiliary functions (e.g., `TUMBLE_START`) to the corresponding Flink expressions (e.g., `WindowStart`). A more detailed description is on the FLINK-5954. > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3461: [FLINK-5954] Always assign names to the window in the Str...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3461 Sorry for the late response. This is a prerequisite step for FLINK-6012 -- translating the group auxiliary functions (e.g., `TUMBLE_START`) to the corresponding Flink expressions (e.g., `WindowStart`). A more detailed description is on the FLINK-5954. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904042#comment-15904042 ] ASF GitHub Bot commented on FLINK-5954: --- Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3461 > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3461: [FLINK-5954] Always assign names to the window in ...
Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3461 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6012) Support WindowStart / WindowEnd functions in stream SQL
Haohui Mai created FLINK-6012: - Summary: Support WindowStart / WindowEnd functions in stream SQL Key: FLINK-6012 URL: https://issues.apache.org/jira/browse/FLINK-6012 Project: Flink Issue Type: Sub-task Reporter: Haohui Mai Assignee: Haohui Mai This jira proposes to add support for {{TUMBLE_START()}} / {{TUMBLE_END()}} / {{HOP_START()}} / {{HOP_END()}} / {{SESSUIB_START()}} / {{SESSION_END()}} in the planner in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
Haohui Mai created FLINK-6011: - Summary: Support TUMBLE, HOP, SESSION window in streaming SQL Key: FLINK-6011 URL: https://issues.apache.org/jira/browse/FLINK-6011 Project: Flink Issue Type: Sub-task Reporter: Haohui Mai Assignee: Haohui Mai CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / {{HOP}} / {{SESSION}} windows in the parser. This jira tracks the efforts of adding the corresponding supports on the planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haohui Mai reassigned FLINK-5829: - Assignee: Haohui Mai > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6010) Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section
Bowen Li created FLINK-6010: --- Summary: Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section Key: FLINK-6010 URL: https://issues.apache.org/jira/browse/FLINK-6010 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.2.0 Reporter: Bowen Li Fix For: 1.2.1 In https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/ide_setup.html#installing-the-scala-plugin, how you should get to 'plugins' page in IntelliJ IDEA is wrong. This seems to be describing a much older version of IntelliJ IDE. The correct path now is: IntelliJ IDEA -> Preferences -> Plugins -> Install JetBrains plugin I'll submit a PR to fix this -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6009) Deprecate DataSetUtils#checksumHashCode
Greg Hogan created FLINK-6009: - Summary: Deprecate DataSetUtils#checksumHashCode Key: FLINK-6009 URL: https://issues.apache.org/jira/browse/FLINK-6009 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 1.3.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.3.0 This is likely only used by Gelly and we have a more featureful implementation allowing for multiple outputs and setting the job name. Deprecation will allow this to be removed in Flink 2.0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3502: [FLINK-4565] Support for SQL IN operator
GitHub user DmytroShkvyra opened a pull request: https://github.com/apache/flink/pull/3502 [FLINK-4565] Support for SQL IN operator [FLINK-4565] Support for SQL IN operator This PR is a part of work on SQL IN operator in Table API, which implements IN for literals. Two cases are covered: less and great then 20 literals. Also I have some questions: - converting all numeric types to BigDecimal isn't ok? I decided to make so to simplify use of hashset. - validation isn't really good. It forces to use operator with same type literals. Should I rework it or maybe just add more cases? expressionDsl.scala: entry point for IN operator in scala API ScalarOperators.scala: 1) All numeric types are upcasting to BigDecimal for using in hashset, other types are unchanged in castNumeric 2) valuesInitialization used for 2 cases: when we have more then 20 operands (then we use hashset, initialized in constructor, descibed below) and less then 20 operands (then we initialize operands in method's body and use them in conjunction) 3) comparison also covers described above cases. In first case we use callback to declare and initialize hashset with all operands. Otherwise we just put all operands in conjunction. 4) Final code is built up with these code snippets. CodeGenerator.scala: passes arguments and callback to declare and init hashet FunctionCatalog.scala: registers "in" as method InITCase: some use cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3502 commit f7960d66a0f885a5b032345427c5380f268cc60e Author: DmytroShkvyra Date: 2017-03-09T19:37:46Z [FLINK-4565] Support for SQL IN operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903759#comment-15903759 ] ASF GitHub Bot commented on FLINK-4565: --- GitHub user DmytroShkvyra opened a pull request: https://github.com/apache/flink/pull/3502 [FLINK-4565] Support for SQL IN operator [FLINK-4565] Support for SQL IN operator This PR is a part of work on SQL IN operator in Table API, which implements IN for literals. Two cases are covered: less and great then 20 literals. Also I have some questions: - converting all numeric types to BigDecimal isn't ok? I decided to make so to simplify use of hashset. - validation isn't really good. It forces to use operator with same type literals. Should I rework it or maybe just add more cases? expressionDsl.scala: entry point for IN operator in scala API ScalarOperators.scala: 1) All numeric types are upcasting to BigDecimal for using in hashset, other types are unchanged in castNumeric 2) valuesInitialization used for 2 cases: when we have more then 20 operands (then we use hashset, initialized in constructor, descibed below) and less then 20 operands (then we initialize operands in method's body and use them in conjunction) 3) comparison also covers described above cases. In first case we use callback to declare and initialize hashset with all operands. Otherwise we just put all operands in conjunction. 4) Final code is built up with these code snippets. CodeGenerator.scala: passes arguments and callback to declare and init hashet FunctionCatalog.scala: registers "in" as method InITCase: some use cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3502 commit f7960d66a0f885a5b032345427c5380f268cc60e Author: DmytroShkvyra Date: 2017-03-09T19:37:46Z [FLINK-4565] Support for SQL IN operator > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105244663 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr Unfortunately, add support for POJO and tuples as a standard types of data in table API is too complicated task and out of scope this task. Adding of new composite types to API more wide than add IN operator, it will impact all process of validation and execution of SQL statements. It would be better create epic for introduce new types. I have researched it and I think I would be better commit IN operator functionality without support tuples and POJOs. Otherwise, It would become infinite task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903654#comment-15903654 ] ASF GitHub Bot commented on FLINK-4565: --- Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105244663 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr Unfortunately, add support for POJO and tuples as a standard types of data in table API is too complicated task and out of scope this task. Adding of new composite types to API more wide than add IN operator, it will impact all process of validation and execution of SQL statements. It would be better create epic for introduce new types. I have researched it and I think I would be better commit IN operator functionality without support tuples and POJOs. Otherwise, It would become infinite task. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3501: [FLINK-5874] Restrict key types in the DataStream API.
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3501 This should work and is well tested, good job. Had a bunch of minor comments, but nothing critical. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903645#comment-15903645 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3501 This should work and is well tested, good job. Had a bunch of minor comments, but nothing critical. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903632#comment-15903632 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240510 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? --- End diff -- we should also simply break this if statement down into multiple blocks. i.e. ``` if (type isntanceof PojoTypeInfo) { return //find hashCode } if (tpye instance off XArrayTypeInfo ... ) { return false; } return true; ``` > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903631#comment-15903631 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241140 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = --- End diff -- Do we need a separate class here or could we just throw in an Object[]? > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903635#comment-15903635 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241450 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903628#comment-15903628 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239060 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); --- End diff -- First time I'm seeing this pattern and i can't help but feel that a simple try-catch block would be more readable. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903633#comment-15903633 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239223 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java --- @@ -736,7 +736,7 @@ public void restoreState(List state) throws Exception { static final ValueStateDescriptor descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false); private static final long serialVersionUID = 1L; - private ValueState operatorState; + private transient ValueState operatorState; --- End diff -- unrelated change. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903629#comment-15903629 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241562 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903624#comment-15903624 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239687 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); --- End diff -- Let's include ```typeInfo``` in this exception as well to narrow it down for the user. We may even want to delay the exception until we scanned the entire type and report all invalid keys at once. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903637#comment-15903637 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105242176 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903638#comment-15903638 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239799 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? --- End diff -- ? and : are typically at the start of a new line. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903634#comment-15903634 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241262 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903630#comment-15903630 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241358 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903626#comment-15903626 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238457 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903636#comment-15903636 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240936 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903639#comment-15903639 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238197 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { --- End diff -- type: Arround -> Around > Reject arrays as keys in DataStream API to avoid inconsistent hashing > --
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903625#comment-15903625 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240608 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? + !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class) : --- End diff -- this method would be more readable by not inverting here, and returning true as the default. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903627#comment-15903627 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240820 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: --- End diff -- I would shorten this to read ```returns true if the type overrides the hashcode implementation```. The details can be container in the general javadoc of the method. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241358 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239687 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); --- End diff -- Let's include ```typeInfo``` in this exception as well to narrow it down for the user. We may even want to delay the exception until we scanned the entire type and report all invalid keys at once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241262 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238457 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241450 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239060 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); --- End diff -- First time I'm seeing this pattern and i can't help but feel that a simple try-catch block would be more readable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239799 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? --- End diff -- ? and : are typically at the start of a new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240820 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: --- End diff -- I would shorten this to read ```returns true if the type overrides the hashcode implementation```. The details can be container in the general javadoc of the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105242176 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( --- End diff -- The focus of this test appears to be to verify that you can use an array as a key by wrapping it in a Pojo that implements HashCode. (based on the naming). We should probably focus more on the Pojo-with-hashCode i
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240608 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? + !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class) : --- End diff -- this method would be more readable by not inverting here, and returning true as the default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241562 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240936 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce