[
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903635#comment-15903635
]
ASF GitHub Bot commented on FLINK-5874:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3501#discussion_r105241450
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
/////////////////////////////////////////////////////////////
+ // KeyBy testing
+ /////////////////////////////////////////////////////////////
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testPrimitiveArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, int[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>,
int[]>() {
+
+ @Override
+ public int[] getKey(Tuple2<Integer[], String> value)
throws Exception {
+ int[] ks = new int[value.f0.length];
+ for (int i = 0; i < ks.length; i++) {
+ ks[i] = value.f0[i];
+ }
+ return ks;
+ }
+ };
+
+ testKeyRejection(keySelector,
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testBasicArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>,
Integer[]>() {
+
+ @Override
+ public Integer[] getKey(Tuple2<Integer[], String>
value) throws Exception {
+ return value.f0;
+ }
+ };
+
+ testKeyRejection(keySelector,
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testObjectArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, TestClass[]> keySelector
=
+ new KeySelector<Tuple2<Integer[], String>,
TestClass[]>() {
+
+ @Override
+ public TestClass[]
getKey(Tuple2<Integer[], String> value) throws Exception {
+ TestClass[] ks = new
TestClass[value.f0.length];
+ for (int i = 0; i < ks.length;
i++) {
+ ks[i] = new
TestClass(value.f0[i]);
+ }
+ return ks;
+ }
+ };
+
+ ObjectArrayTypeInfo<TestClass[], TestClass> keyTypeInfo =
ObjectArrayTypeInfo.getInfoFor(
+ TestClass[].class, new
GenericTypeInfo<>(TestClass.class));
+
+ testKeyRejection(keySelector, keyTypeInfo);
+ }
+
+ private <K> void testKeyRejection(KeySelector<Tuple2<Integer[],
String>, K> keySelector, TypeInformation<K> expectedKeyType) {
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<Integer[], String>> input = env.fromElements(
+ new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+ );
+
+ Assert.assertEquals(expectedKeyType,
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+ expectedException.expectMessage("This type (" + expectedKeyType
+ ") cannot be used as key.");
+
+ input.keyBy(keySelector);
+ }
+
+ // composite key tests : POJOs
+
+ @Test
+ public void testPOJONestedArrayKeyRejection() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<POJOwithHashCode> input = env.fromElements(
+ new POJOwithHashCode(new int[] {1, 2}));
+
+ TypeInformation<?> expectedTypeInfo = new
TupleTypeInfo<Tuple1<int[]>>(
+
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+ expectedException.expectMessage("This type (" +
expectedTypeInfo + ") cannot be used as key.");
+
+ input.keyBy("id");
+ }
+
+ @Test
+ public void testNestedArrayWorkArround() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<POJOwithHashCode> input = env.fromElements(
+ new POJOwithHashCode(new int[] {1, 2}));
+
+ input.keyBy(new KeySelector<POJOwithHashCode,
POJOwithHashCode>() {
+ @Override
+ public POJOwithHashCode getKey(POJOwithHashCode value)
throws Exception {
+ return value;
+ }
+ }).addSink(new SinkFunction<POJOwithHashCode>() {
+ @Override
+ public void invoke(POJOwithHashCode value) throws
Exception {
+ Assert.assertEquals(value.getId(), new int[]{1,
2});
+ }
+ });
+ }
+
+ @Test
+ public void testPOJOnoHashCodeKeyRejection() {
+
+ KeySelector<POJOnoHashCode, POJOnoHashCode> keySelector =
+ new KeySelector<POJOnoHashCode,
POJOnoHashCode>() {
+ @Override
+ public POJOnoHashCode
getKey(POJOnoHashCode value) throws Exception {
+ return value;
+ }
+ };
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<POJOnoHashCode> input = env.fromElements(
+ new POJOnoHashCode(new int[] {1, 2}));
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+
+ input.keyBy(keySelector);
+ }
+
+ // composite key tests : Tuples
+
+ @Test
+ public void testTupleNestedArrayKeyRejection() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<Integer[], String>> input = env.fromElements(
+ new Tuple2<>(new Integer[] {1, 2},
"test-test"));
+
+ TypeInformation<?> expectedTypeInfo = new
TupleTypeInfo<Tuple2<Integer[], String>>(
+ BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+ expectedException.expectMessage("This type (" +
expectedTypeInfo + ") cannot be used as key.");
+
+ input.keyBy(new KeySelector<Tuple2<Integer[],String>,
Tuple2<Integer[],String>>() {
+ @Override
+ public Tuple2<Integer[], String>
getKey(Tuple2<Integer[], String> value) throws Exception {
+ return value;
+ }
+ });
+ }
+
+ @Test
+ public void testPrimitiveKeyRejection() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setMaxParallelism(1);
+
+ DataStream<Integer> input = env.fromElements(new
Integer(10000));
+
+ TypeInformation<?> expectedTypeInfo =
IntegerTypeInfo.INT_TYPE_INFO;
+
+ KeyedStream<Integer, Integer> keyedStream = input.keyBy(new
KeySelector<Integer, Integer>() {
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ });
+
+ Assert.assertEquals(expectedTypeInfo, keyedStream.getKeyType());
+
+ keyedStream.addSink(new SinkFunction<Integer>() {
+ @Override
+ public void invoke(Integer value) throws Exception {
+ Assert.assertEquals(10000L, (long) value);
+ }
+ });
+
+ env.execute();
+ }
+
+ private static class TestClass {
+
+ private final int id;
+
+ TestClass(int id) {
+ this.id = id;
+ }
+ }
+
+ public static class POJOnoHashCode {
--- End diff --
the naming is also inconsistent with the other class; this class name
doesn't contain "with".
> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> ---------------------------------------------------------------------
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.0, 1.1.4
> Reporter: Robert Metzger
> Assignee: Kostas Kloudas
> Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute
> the hash when shuffling data. Java's default hashCode() implementation
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}}
> class, which has a type-specific hashing function. But introducing this
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)