This is an automated email from the ASF dual-hosted git repository. fcsaky pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/v3.0 by this push: new d199ce7 [FLINK-36188] Fix disable buffer flush lose efficacy (#49) d199ce7 is described below commit d199ce7ad2a196a9893dd1966a8bd1d5f1e6fc82 Author: MOBIN <18814118...@163.com> AuthorDate: Wed Sep 25 18:22:35 2024 +0800 [FLINK-36188] Fix disable buffer flush lose efficacy (#49) --- .../connector/hbase2/HBaseConnectorITCase.java | 60 ++++++++++++++++++++++ .../flink/connector/hbase2/util/HBaseTestBase.java | 9 ++++ .../connector/hbase/sink/HBaseSinkFunction.java | 2 + 3 files changed, 71 insertions(+) diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index 9a2736e..9f1109d 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -665,6 +665,66 @@ class HBaseConnectorITCase extends HBaseTestBase { sinkFunction.close(); } + @Test + void testTableSinkDisabledBufferFlush() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + tEnv.executeSql( + "CREATE TABLE hTableForSink (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW<col1 INT>" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'sink.buffer-flush.max-size' = '0'," + + " 'sink.buffer-flush.max-rows' = '0'," + + " 'table-name' = '" + + TEST_TABLE_7 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))"; + tEnv.executeSql(insert).await(); + + tEnv.executeSql( + "CREATE VIEW user_click AS " + + " SELECT user_id, proctime() AS proc_time" + + " FROM ( " + + " VALUES(1), (1), (1), (1), (1)" + + " ) AS t (user_id);"); + + tEnv.executeSql( + "INSERT INTO hTableForSink SELECT " + + " user_id as rowkey," + + " ROW(CAST(family1.col1 + 1 AS INT))" + + " FROM user_click INNER JOIN hTableForSink" + + " FOR SYSTEM_TIME AS OF user_click.proc_time" + + " ON hTableForSink.rowkey = user_click.user_id;"); + + tEnv.executeSql( + "CREATE TABLE hTableForQuery (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW<col1 INT>" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_TABLE_7 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; + + TableResult firstResult = tEnv.executeSql(query); + List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect()); + String firstExpected = "+I[1, 6]"; + TestBaseUtils.compareResultAsText(firstResults, firstExpected); + } + private void verifyHBaseLookupJoin(Caching caching, boolean async) { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index 6ea08bb..0195900 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -46,6 +46,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_4 = "testTable4"; protected static final String TEST_TABLE_5 = "testTable5"; protected static final String TEST_TABLE_6 = "testTable6"; + protected static final String TEST_TABLE_7 = "testTable7"; protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; @@ -100,6 +101,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable4(); createHBaseTable5(); createHBaseTable6(); + createHBaseTable7(); createEmptyHBaseTable(); } @@ -262,6 +264,13 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable7() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_7); + createTable(tableName, families, SPLIT_KEYS); + } + private static void createEmptyHBaseTable() { // create a table byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java index 0ffad05..fbe8dcd 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java @@ -209,6 +209,8 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { flush(); + } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) { + flush(); } }