[
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905176#comment-15905176
]
ASF GitHub Bot commented on FLINK-5874:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3501#discussion_r105406561
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
---
@@ -906,6 +919,243 @@ public void testChannelSelectors() {
}
/////////////////////////////////////////////////////////////
+ // KeyBy testing
+ /////////////////////////////////////////////////////////////
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testPrimitiveArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, int[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>,
int[]>() {
+
+ @Override
+ public int[] getKey(Tuple2<Integer[], String> value)
throws Exception {
+ int[] ks = new int[value.f0.length];
+ for (int i = 0; i < ks.length; i++) {
+ ks[i] = value.f0[i];
+ }
+ return ks;
+ }
+ };
+
+ testKeyRejection(keySelector,
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testBasicArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>,
Integer[]>() {
+
+ @Override
+ public Integer[] getKey(Tuple2<Integer[], String>
value) throws Exception {
+ return value.f0;
+ }
+ };
+
+ testKeyRejection(keySelector,
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testObjectArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, Object[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>,
Object[]>() {
+
+ @Override
+ public Object[]
getKey(Tuple2<Integer[], String> value) throws Exception {
+ Object[] ks = new
TestClass[value.f0.length];
--- End diff --
Can we use plain objects here? Then we wouldn't need the TestClass at all.
> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> ---------------------------------------------------------------------
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.0, 1.1.4
> Reporter: Robert Metzger
> Assignee: Kostas Kloudas
> Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute
> the hash when shuffling data. Java's default hashCode() implementation
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}}
> class, which has a type-specific hashing function. But introducing this
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)