This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/main by this push: new b7850a5 [FLINK-35233] Fix lookup cache reuse RowData object problem (#47) b7850a5 is described below commit b7850a54d9f3958cc67e5985283ce077b98d973b Author: Tan-JiaLiang <tanjialiang1...@gmail.com> AuthorDate: Mon Apr 29 22:35:22 2024 +0800 [FLINK-35233] Fix lookup cache reuse RowData object problem (#47) * fix: convertToReusedRow() is now returned by default, and the result returned is a reused object. If lookup.cache is enabled, the result encapsulated in the reused object will be cached externally, resulting in all cached values being the same object * [FLINK-35233] Fix lookup cache reuse RowData object problem --------- Co-authored-by: xiekunyuan <xiekuny...@meizu.com> --- .../connector/hbase2/HBaseConnectorITCase.java | 39 ++++++++++++++++++---- .../hbase/source/HBaseRowDataLookupFunction.java | 2 +- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index a946edd..9a2736e 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.util.Bytes; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.util.ArrayList; @@ -591,14 +593,16 @@ class HBaseConnectorITCase extends HBaseTestBase { assertThat(result).isEqualTo(expected); } - @Test - void testHBaseLookupTableSource() { - verifyHBaseLookupJoin(false); + @ParameterizedTest + @EnumSource(Caching.class) + void testHBaseLookupTableSource(Caching caching) { + verifyHBaseLookupJoin(caching, false); } - @Test - void testHBaseAsyncLookupTableSource() { - verifyHBaseLookupJoin(true); + @ParameterizedTest + @EnumSource(Caching.class) + void testHBaseAsyncLookupTableSource(Caching caching) { + verifyHBaseLookupJoin(caching, true); } @Test @@ -661,10 +665,22 @@ class HBaseConnectorITCase extends HBaseTestBase { sinkFunction.close(); } - private void verifyHBaseLookupJoin(boolean async) { + private void verifyHBaseLookupJoin(Caching caching, boolean async) { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + String cacheOptions = ""; + if (caching == Caching.ENABLE_CACHE) { + cacheOptions = + "," + + String.join( + ",", + Arrays.asList( + "'lookup.cache' = 'PARTIAL'", + "'lookup.partial-cache.max-rows' = '1000'", + "'lookup.partial-cache.expire-after-write' = '10min'")); + } + tEnv.executeSql( "CREATE TABLE " + TEST_TABLE_1 @@ -686,6 +702,7 @@ class HBaseConnectorITCase extends HBaseTestBase { + " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" + + cacheOptions + ")"); // prepare a source table @@ -722,6 +739,8 @@ class HBaseConnectorITCase extends HBaseTestBase { .collect(Collectors.toList()); List<String> expected = new ArrayList<>(); + expected.add( + "+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]"); expected.add( "+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]"); expected.add( @@ -750,6 +769,12 @@ class HBaseConnectorITCase extends HBaseTestBase { testData.add(Row.of(2, 2L, "Hello")); testData.add(Row.of(3, 2L, "Hello world")); testData.add(Row.of(3, 3L, "Hello world!")); + testData.add(Row.of(1, 1L, "Hi")); // lookup one more time + } + + private enum Caching { + ENABLE_CACHE, + DISABLE_CACHE } // ------------------------------- Utilities ------------------------------------------------- diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java index 0e1ba54..59edf30 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java @@ -97,7 +97,7 @@ public class HBaseRowDataLookupFunction extends LookupFunction { if (get != null) { Result result = table.get(get); if (!result.isEmpty()) { - return Collections.singletonList(serde.convertToReusedRow(result)); + return Collections.singletonList(serde.convertToNewRow(result)); } } break;