[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-29 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842218#comment-17842218
 ] 

dalongliu commented on FLINK-35184:
---

Merged in master: f543cc543e9b0eb05415095190e86d3b22cdf1a4

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-29 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842219#comment-17842219
 ] 

dalongliu commented on FLINK-35184:
---

[~rovboyko] Can you help create a backport pr to release-1.19?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-29 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842232#comment-17842232
 ] 

Roman Boyko commented on FLINK-35184:
-

[~lsy] , done - https://github.com/apache/flink/pull/24749

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-05-09 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845187#comment-17845187
 ] 

dalongliu commented on FLINK-35184:
---

Release-1.19: 17e7c3eaf14b6c63f55d28a308e30ad6a3a80c95

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839382#comment-17839382
 ] 

Roman Boyko commented on FLINK-35184:
-

please assign this bug on me, I'm working on it.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-21 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839464#comment-17839464
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , thx for reporting this bug which is caused by the hashcode() 
in GenericRowData. 
Could you please give a rough explanation of your solutions before implementing 
it?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839540#comment-17839540
 ] 

Roman Boyko commented on FLINK-35184:
-

Most probably the easiest way to fix it would be to replace the Map> to Map> inside 
InputSideHasNoUniqueKeyBundle. In such attempt we should store every record as 
map key with constant rowKind=+I, while rowkinds would be stored as values. And 
before providing the record to processing we should set the proper rowKind back 
to it.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839578#comment-17839578
 ] 

Roman Boyko commented on FLINK-35184:
-

"bug which is caused by the hashcode() in GenericRowData"

And by the way - why do you think that BinaryRowData can't produce the same 
collision?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839920#comment-17839920
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , actually it can't be avoid hash collision even if using 
BinaryRowData which can only reduce the probability to some extent. And the 
solution you mentioned works for me.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839928#comment-17839928
 ] 

Roman Boyko commented on FLINK-35184:
-

[~xu_shuai_] , Ok agree with you. So may I start the implementation?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839947#comment-17839947
 ] 

Shuai Xu commented on FLINK-35184:
--

Absolutely, please feel free to start the implementation.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-23 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839978#comment-17839978
 ] 

Roman Boyko commented on FLINK-35184:
-

I've found even simplier solution without changing the storage schema - 
https://github.com/apache/flink/pull/24703

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>  Labels: pull-request-available
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)