[
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904917#comment-15904917
]
ASF GitHub Bot commented on FLINK-5874:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/3501#discussion_r105377789
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
---
@@ -114,9 +121,53 @@ public KeyedStream(DataStream<T> dataStream,
KeySelector<T, KEY> keySelector, Ty
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector,
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
- this.keyType = keyType;
+ this.keyType = validateKeyType(keyType);
}
-
+
+ private TypeInformation<KEY> validateKeyType(TypeInformation<KEY>
keyType) {
+ Stack<TypeInformation<?>> stack = new Stack<>();
+ stack.push(keyType);
+
+ while (!stack.isEmpty()) {
+ TypeInformation<?> typeInfo = stack.pop();
+
+ if (!validateKeyTypeIsHashable(typeInfo)) {
+ throw new InvalidProgramException("This type ("
+ keyType + ") cannot be used as key.");
+ }
+
+ if (typeInfo instanceof TupleTypeInfoBase) {
+ for (int i = 0; i < typeInfo.getArity(); i++) {
+ stack.push(((TupleTypeInfoBase)
typeInfo).getTypeAt(i));
+ }
+ }
+ }
+ return keyType;
+ }
+
+ /**
+ * Validates that a given type of element (as encoded by the provided
{@link TypeInformation}) can be
+ * used as a key in the {@code DataStream.keyBy()} operation.
+ *
+ * @return {@code false} if:
--- End diff --
I think it is worth having it also here for users of the method. The more
the places that the user can find the required information, the better.
> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> ---------------------------------------------------------------------
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.0, 1.1.4
> Reporter: Robert Metzger
> Assignee: Kostas Kloudas
> Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute
> the hash when shuffling data. Java's default hashCode() implementation
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}}
> class, which has a type-specific hashing function. But introducing this
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)