This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git


The following commit(s) were added to refs/heads/v3.0 by this push:
     new d199ce7  [FLINK-36188] Fix disable buffer flush lose efficacy (#49)
d199ce7 is described below

commit d199ce7ad2a196a9893dd1966a8bd1d5f1e6fc82
Author: MOBIN <18814118...@163.com>
AuthorDate: Wed Sep 25 18:22:35 2024 +0800

    [FLINK-36188] Fix disable buffer flush lose efficacy (#49)
---
 .../connector/hbase2/HBaseConnectorITCase.java     | 60 ++++++++++++++++++++++
 .../flink/connector/hbase2/util/HBaseTestBase.java |  9 ++++
 .../connector/hbase/sink/HBaseSinkFunction.java    |  2 +
 3 files changed, 71 insertions(+)

diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index 9a2736e..9f1109d 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -665,6 +665,66 @@ class HBaseConnectorITCase extends HBaseTestBase {
         sinkFunction.close();
     }
 
+    @Test
+    void testTableSinkDisabledBufferFlush() throws Exception {
+        StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForSink ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'sink.buffer-flush.max-size' = '0',"
+                        + " 'sink.buffer-flush.max-rows' = '0',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_7
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
+        tEnv.executeSql(insert).await();
+
+        tEnv.executeSql(
+                "CREATE VIEW user_click AS "
+                        + " SELECT user_id, proctime() AS proc_time"
+                        + " FROM ( "
+                        + " VALUES(1), (1), (1), (1), (1)"
+                        + " ) AS t (user_id);");
+
+        tEnv.executeSql(
+                "INSERT INTO hTableForSink SELECT "
+                        + "    user_id as rowkey,"
+                        + "    ROW(CAST(family1.col1 + 1 AS INT))"
+                        + " FROM user_click INNER JOIN hTableForSink"
+                        + " FOR SYSTEM_TIME AS OF user_click.proc_time"
+                        + " ON hTableForSink.rowkey = user_click.user_id;");
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForQuery ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_7
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+        String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
+
+        TableResult firstResult = tEnv.executeSql(query);
+        List<Row> firstResults = 
CollectionUtil.iteratorToList(firstResult.collect());
+        String firstExpected = "+I[1, 6]";
+        TestBaseUtils.compareResultAsText(firstResults, firstExpected);
+    }
+
     private void verifyHBaseLookupJoin(Caching caching, boolean async) {
         StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 6ea08bb..0195900 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -46,6 +46,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_4 = "testTable4";
     protected static final String TEST_TABLE_5 = "testTable5";
     protected static final String TEST_TABLE_6 = "testTable6";
+    protected static final String TEST_TABLE_7 = "testTable7";
     protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
     protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
@@ -100,6 +101,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createHBaseTable4();
         createHBaseTable5();
         createHBaseTable6();
+        createHBaseTable7();
         createEmptyHBaseTable();
     }
 
@@ -262,6 +264,13 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable7() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_TABLE_7);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static void createEmptyHBaseTable() {
         // create a table
         byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 0ffad05..fbe8dcd 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -209,6 +209,8 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
         if (bufferFlushMaxMutations > 0
                 && numPendingRequests.incrementAndGet() >= 
bufferFlushMaxMutations) {
             flush();
+        } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes 
== 0) {
+            flush();
         }
     }
 

Reply via email to