This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git


The following commit(s) were added to refs/heads/v3.0 by this push:
     new 870ff07  [FLINK-33207] Return empty split when the HBase table is 
empty. This closes #23
870ff07 is described below

commit 870ff0749d617ba03ecbd48632107793112e594e
Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com>
AuthorDate: Fri Nov 10 18:09:26 2023 +0800

    [FLINK-33207] Return empty split when the HBase table is empty. This closes 
#23
    
    Co-authored-by: tanjialiang <tanjiali...@52tt.com>
    (cherry picked from commit 4b33c32a7f40b7e4fb469facf436017f2cdd8485)
---
 .../hbase1/source/AbstractTableInputFormat.java    |  9 +-------
 .../connector/hbase1/HBaseConnectorITCase.java     | 25 ++++++++++++++++++++++
 .../flink/connector/hbase1/util/HBaseTestBase.java |  9 ++++++++
 .../hbase2/source/AbstractTableInputFormat.java    |  9 +-------
 .../connector/hbase2/HBaseConnectorITCase.java     | 25 ++++++++++++++++++++++
 .../flink/connector/hbase2/util/HBaseTestBase.java |  9 ++++++++
 6 files changed, 70 insertions(+), 16 deletions(-)

diff --git 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java
 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java
index 6b60adc..7cb5331 100644
--- 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java
+++ 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java
@@ -230,14 +230,7 @@ public abstract class AbstractTableInputFormat<T> extends 
RichInputFormat<T, Tab
             // Get the starting and ending row keys for every region in the 
currently open table
             final Pair<byte[][], byte[][]> keys = 
table.getRegionLocator().getStartEndKeys();
             if (keys == null || keys.getFirst() == null || 
keys.getFirst().length == 0) {
-                LOG.warn(
-                        "Unexpected region keys: {} appeared in HBase table: 
{}, all region information are: {}.",
-                        keys,
-                        table,
-                        table.getRegionLocator().getAllRegionLocations());
-                throw new IOException(
-                        "HBase Table expects at least one region in scan,"
-                                + " please check the HBase table status in 
HBase cluster");
+                return new TableInputSplit[] {};
             }
             final byte[] startRow = scan.getStartRow();
             final byte[] stopRow = scan.getStopRow();
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 399238d..31290a7 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -116,6 +116,31 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         TestBaseUtils.compareResultAsText(results, expected);
     }
 
+    @Test
+    public void testTableSourceEmptyTableScan() {
+        TableEnvironment tEnv = TableEnvironment.create(batchSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTable ("
+                        + " family1 ROW<col1 INT>,"
+                        + " rowkey INT,"
+                        + " PRIMARY KEY (rowkey) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-1.4',"
+                        + " 'table-name' = '"
+                        + TEST_EMPTY_TABLE
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        Table table = tEnv.sqlQuery("SELECT rowkey, h.family1.col1 FROM hTable 
AS h");
+        List<Row> results = 
CollectionUtil.iteratorToList(table.execute().collect());
+
+        assertThat(results).isEmpty();
+    }
+
     @Test
     public void testTableSourceProjection() {
         TableEnvironment tEnv = TableEnvironment.create(batchSettings);
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index 1576f80..530285d 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
+    protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
     protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
     protected static final String ROW_KEY = "rowkey";
@@ -95,6 +96,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createHBaseTable2();
         createHBaseTable3();
         createHBaseTable4();
+        createEmptyHBaseTable();
     }
 
     private static void createHBaseTable1() throws IOException {
@@ -242,6 +244,13 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createEmptyHBaseTable() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_EMPTY_TABLE);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static Put putRow(
             int rowKey,
             int f1c1,
diff --git 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java
 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java
index d340fe8..122a223 100644
--- 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java
+++ 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java
@@ -230,14 +230,7 @@ public abstract class AbstractTableInputFormat<T> extends 
RichInputFormat<T, Tab
             // Get the starting and ending row keys for every region in the 
currently open table
             final Pair<byte[][], byte[][]> keys = 
regionLocator.getStartEndKeys();
             if (keys == null || keys.getFirst() == null || 
keys.getFirst().length == 0) {
-                LOG.warn(
-                        "Unexpected region keys: {} appeared in HBase table: 
{}, all region information are: {}.",
-                        keys,
-                        table,
-                        regionLocator.getAllRegionLocations());
-                throw new IOException(
-                        "HBase Table expects at least one region in scan,"
-                                + " please check the HBase table status in 
HBase cluster");
+                return new TableInputSplit[] {};
             }
             final byte[] startRow = scan.getStartRow();
             final byte[] stopRow = scan.getStopRow();
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index 4ebc50f..df72146 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -130,6 +130,31 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         TestBaseUtils.compareResultAsText(results, expected);
     }
 
+    @Test
+    public void testTableSourceEmptyTableScan() {
+        TableEnvironment tEnv = TableEnvironment.create(batchSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTable ("
+                        + " family1 ROW<col1 INT>,"
+                        + " rowkey INT,"
+                        + " PRIMARY KEY (rowkey) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_EMPTY_TABLE
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        Table table = tEnv.sqlQuery("SELECT rowkey, h.family1.col1 FROM hTable 
AS h");
+        List<Row> results = 
CollectionUtil.iteratorToList(table.execute().collect());
+
+        assertThat(results).isEmpty();
+    }
+
     @Test
     public void testTableSourceProjection() {
         TableEnvironment tEnv = TableEnvironment.create(batchSettings);
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 621143d..1bf60d5 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
+    protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
     protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
     protected static final String ROW_KEY = "rowkey";
@@ -95,6 +96,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createHBaseTable2();
         createHBaseTable3();
         createHBaseTable4();
+        createEmptyHBaseTable();
     }
 
     private static void createHBaseTable1() throws IOException {
@@ -242,6 +244,13 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createEmptyHBaseTable() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_EMPTY_TABLE);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static Put putRow(
             int rowKey,
             int f1c1,

Reply via email to