This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/main by this push: new 0014377 [FLINK-33206] Verify the existence of HBase table before read/write. This closes #22 0014377 is described below commit 00143773ba3f647099b7f53c17133fef99ab8fed Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com> AuthorDate: Fri Nov 10 16:21:55 2023 +0800 [FLINK-33206] Verify the existence of HBase table before read/write. This closes #22 Co-authored-by: Tan-JiaLiang <tanjialiang1...@gmail.com> --- .../hbase1/source/HBaseRowDataInputFormat.java | 11 +++-- .../connector/hbase1/HBaseConnectorITCase.java | 48 ++++++++++++++++++++++ .../flink/connector/hbase1/util/HBaseTestBase.java | 1 + .../hbase2/source/HBaseRowDataInputFormat.java | 18 ++++---- .../connector/hbase2/HBaseConnectorITCase.java | 47 +++++++++++++++++++++ .../flink/connector/hbase2/util/HBaseTestBase.java | 1 + .../connector/hbase/sink/HBaseSinkFunction.java | 9 +++- 7 files changed, 117 insertions(+), 18 deletions(-) diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java index d48c2fa..21ce8a4 100644 --- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java +++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java @@ -86,12 +86,11 @@ public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> { } private void connectToTable() throws IOException { - try { - connection = ConnectionFactory.createConnection(getHadoopConfiguration()); - table = (HTable) connection.getTable(TableName.valueOf(tableName)); - } catch (TableNotFoundException tnfe) { - LOG.error("The table " + tableName + " not found ", tnfe); - throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe); + connection = ConnectionFactory.createConnection(getHadoopConfiguration()); + TableName name = TableName.valueOf(tableName); + if (!connection.getAdmin().tableExists(name)) { + throw new TableNotFoundException("HBase table '" + tableName + "' not found."); } + table = (HTable) connection.getTable(name); } } diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java index 0ea0002..399238d 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java @@ -21,6 +21,10 @@ package org.apache.flink.connector.hbase1; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.hbase.sink.HBaseSinkFunction; +import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter; +import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase1.source.AbstractTableInputFormat; import org.apache.flink.connector.hbase1.source.HBaseRowDataInputFormat; @@ -31,6 +35,7 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.TestBaseUtils; @@ -38,7 +43,9 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CollectionUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; @@ -52,6 +59,7 @@ import java.util.stream.Collectors; import static org.apache.flink.table.api.Expressions.$; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -493,6 +501,46 @@ public class HBaseConnectorITCase extends HBaseTestBase { assertNull(inputFormat.getConnection()); } + @Test + public void testTableInputFormatTableExistence() throws IOException { + HBaseTableSchema tableSchema = new HBaseTableSchema(); + tableSchema.addColumn(FAMILY1, F1COL1, byte[].class); + AbstractTableInputFormat<?> inputFormat = + new HBaseRowDataInputFormat(getConf(), TEST_NOT_EXISTS_TABLE, tableSchema, "null"); + + assertThatThrownBy(() -> inputFormat.createInputSplits(1)) + .isExactlyInstanceOf(TableNotFoundException.class); + + inputFormat.close(); + assertNull(inputFormat.getConnection()); + } + + @Test + public void testHBaseSinkFunctionTableExistence() throws Exception { + org.apache.hadoop.conf.Configuration hbaseConf = + HBaseConfigurationUtil.getHBaseConfiguration(); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum()); + hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase"); + + HBaseTableSchema tableSchema = new HBaseTableSchema(); + tableSchema.addColumn(FAMILY1, F1COL1, byte[].class); + + HBaseSinkFunction<RowData> sinkFunction = + new HBaseSinkFunction<>( + TEST_NOT_EXISTS_TABLE, + hbaseConf, + new RowDataToMutationConverter(tableSchema, "null", false), + 2 * 1024 * 1024, + 1000, + 1000); + + assertThatThrownBy(() -> sinkFunction.open(new Configuration())) + .getRootCause() + .isExactlyInstanceOf(TableNotFoundException.class); + + sinkFunction.close(); + } + // ------------------------------------------------------------------------------------- // HBase lookup source tests // ------------------------------------------------------------------------------------- diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java index 86110c8..1576f80 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; + protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; protected static final String ROW_KEY = "rowkey"; diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java index 1bd9a6a..f74eb85 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java @@ -85,16 +85,14 @@ public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> { } private void connectToTable() throws IOException { - try { - if (connection == null) { - connection = ConnectionFactory.createConnection(getHadoopConfiguration()); - } - TableName name = TableName.valueOf(getTableName()); - table = connection.getTable(name); - regionLocator = connection.getRegionLocator(name); - } catch (TableNotFoundException tnfe) { - LOG.error("The table " + tableName + " not found ", tnfe); - throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe); + if (connection == null) { + connection = ConnectionFactory.createConnection(getHadoopConfiguration()); } + TableName name = TableName.valueOf(tableName); + if (!connection.getAdmin().tableExists(name)) { + throw new TableNotFoundException("HBase table '" + tableName + "' not found."); + } + table = connection.getTable(name); + regionLocator = connection.getRegionLocator(name); } } diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index 64a9875..4ebc50f 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -22,6 +22,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.hbase.sink.HBaseSinkFunction; +import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter; +import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase2.source.AbstractTableInputFormat; import org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat; @@ -33,6 +36,7 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -41,7 +45,9 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CollectionUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; import org.junit.Test; @@ -59,6 +65,7 @@ import java.util.stream.StreamSupport; import static org.apache.flink.table.api.Expressions.$; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -478,6 +485,46 @@ public class HBaseConnectorITCase extends HBaseTestBase { assertNull(inputFormat.getConnection()); } + @Test + public void testTableInputFormatTableExistence() throws IOException { + HBaseTableSchema tableSchema = new HBaseTableSchema(); + tableSchema.addColumn(FAMILY1, F1COL1, byte[].class); + AbstractTableInputFormat<?> inputFormat = + new HBaseRowDataInputFormat(getConf(), TEST_NOT_EXISTS_TABLE, tableSchema, "null"); + + assertThatThrownBy(() -> inputFormat.createInputSplits(1)) + .isExactlyInstanceOf(TableNotFoundException.class); + + inputFormat.close(); + assertNull(inputFormat.getConnection()); + } + + @Test + public void testHBaseSinkFunctionTableExistence() throws Exception { + org.apache.hadoop.conf.Configuration hbaseConf = + HBaseConfigurationUtil.getHBaseConfiguration(); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum()); + hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase"); + + HBaseTableSchema tableSchema = new HBaseTableSchema(); + tableSchema.addColumn(FAMILY1, F1COL1, byte[].class); + + HBaseSinkFunction<RowData> sinkFunction = + new HBaseSinkFunction<>( + TEST_NOT_EXISTS_TABLE, + hbaseConf, + new RowDataToMutationConverter(tableSchema, "null", false), + 2 * 1024 * 1024, + 1000, + 1000); + + assertThatThrownBy(() -> sinkFunction.open(new Configuration())) + .getRootCause() + .isExactlyInstanceOf(TableNotFoundException.class); + + sinkFunction.close(); + } + private void verifyHBaseLookupJoin(boolean async) { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index 1301ee1..621143d 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; + protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; protected static final String ROW_KEY = "rowkey"; diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java index 0c4de1a..0ffad05 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java @@ -120,9 +120,14 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> if (null == connection) { this.connection = ConnectionFactory.createConnection(config); } + + TableName tableName = TableName.valueOf(hTableName); + if (!connection.getAdmin().tableExists(tableName)) { + throw new TableNotFoundException(tableName); + } + // create a parameter instance, set the table name and custom listener reference. - BufferedMutatorParams params = - new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this); + BufferedMutatorParams params = new BufferedMutatorParams(tableName).listener(this); if (bufferFlushMaxSizeInBytes > 0) { params.writeBufferSize(bufferFlushMaxSizeInBytes); }