This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/v3.0 by this push: new 870ff07 [FLINK-33207] Return empty split when the HBase table is empty. This closes #23 870ff07 is described below commit 870ff0749d617ba03ecbd48632107793112e594e Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com> AuthorDate: Fri Nov 10 18:09:26 2023 +0800 [FLINK-33207] Return empty split when the HBase table is empty. This closes #23 Co-authored-by: tanjialiang <tanjiali...@52tt.com> (cherry picked from commit 4b33c32a7f40b7e4fb469facf436017f2cdd8485) --- .../hbase1/source/AbstractTableInputFormat.java | 9 +------- .../connector/hbase1/HBaseConnectorITCase.java | 25 ++++++++++++++++++++++ .../flink/connector/hbase1/util/HBaseTestBase.java | 9 ++++++++ .../hbase2/source/AbstractTableInputFormat.java | 9 +------- .../connector/hbase2/HBaseConnectorITCase.java | 25 ++++++++++++++++++++++ .../flink/connector/hbase2/util/HBaseTestBase.java | 9 ++++++++ 6 files changed, 70 insertions(+), 16 deletions(-) diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java index 6b60adc..7cb5331 100644 --- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java +++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java @@ -230,14 +230,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab // Get the starting and ending row keys for every region in the currently open table final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - LOG.warn( - "Unexpected region keys: {} appeared in HBase table: {}, all region information are: {}.", - keys, - table, - table.getRegionLocator().getAllRegionLocations()); - throw new IOException( - "HBase Table expects at least one region in scan," - + " please check the HBase table status in HBase cluster"); + return new TableInputSplit[] {}; } final byte[] startRow = scan.getStartRow(); final byte[] stopRow = scan.getStopRow(); diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java index 399238d..31290a7 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java @@ -116,6 +116,31 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, expected); } + @Test + public void testTableSourceEmptyTableScan() { + TableEnvironment tEnv = TableEnvironment.create(batchSettings); + + tEnv.executeSql( + "CREATE TABLE hTable (" + + " family1 ROW<col1 INT>," + + " rowkey INT," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = '" + + TEST_EMPTY_TABLE + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + Table table = tEnv.sqlQuery("SELECT rowkey, h.family1.col1 FROM hTable AS h"); + List<Row> results = CollectionUtil.iteratorToList(table.execute().collect()); + + assertThat(results).isEmpty(); + } + @Test public void testTableSourceProjection() { TableEnvironment tEnv = TableEnvironment.create(batchSettings); diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java index 1576f80..530285d 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; + protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; protected static final String ROW_KEY = "rowkey"; @@ -95,6 +96,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable2(); createHBaseTable3(); createHBaseTable4(); + createEmptyHBaseTable(); } private static void createHBaseTable1() throws IOException { @@ -242,6 +244,13 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } + private static void createEmptyHBaseTable() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_EMPTY_TABLE); + createTable(tableName, families, SPLIT_KEYS); + } + private static Put putRow( int rowKey, int f1c1, diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java index d340fe8..122a223 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java @@ -230,14 +230,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab // Get the starting and ending row keys for every region in the currently open table final Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - LOG.warn( - "Unexpected region keys: {} appeared in HBase table: {}, all region information are: {}.", - keys, - table, - regionLocator.getAllRegionLocations()); - throw new IOException( - "HBase Table expects at least one region in scan," - + " please check the HBase table status in HBase cluster"); + return new TableInputSplit[] {}; } final byte[] startRow = scan.getStartRow(); final byte[] stopRow = scan.getStopRow(); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index 4ebc50f..df72146 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -130,6 +130,31 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, expected); } + @Test + public void testTableSourceEmptyTableScan() { + TableEnvironment tEnv = TableEnvironment.create(batchSettings); + + tEnv.executeSql( + "CREATE TABLE hTable (" + + " family1 ROW<col1 INT>," + + " rowkey INT," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_EMPTY_TABLE + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + Table table = tEnv.sqlQuery("SELECT rowkey, h.family1.col1 FROM hTable AS h"); + List<Row> results = CollectionUtil.iteratorToList(table.execute().collect()); + + assertThat(results).isEmpty(); + } + @Test public void testTableSourceProjection() { TableEnvironment tEnv = TableEnvironment.create(batchSettings); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index 621143d..1bf60d5 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; + protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; protected static final String ROW_KEY = "rowkey"; @@ -95,6 +96,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable2(); createHBaseTable3(); createHBaseTable4(); + createEmptyHBaseTable(); } private static void createHBaseTable1() throws IOException { @@ -242,6 +244,13 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } + private static void createEmptyHBaseTable() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_EMPTY_TABLE); + createTable(tableName, families, SPLIT_KEYS); + } + private static Put putRow( int rowKey, int f1c1,