This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git


The following commit(s) were added to refs/heads/main by this push:
     new 0014377  [FLINK-33206] Verify the existence of HBase table before 
read/write. This closes #22
0014377 is described below

commit 00143773ba3f647099b7f53c17133fef99ab8fed
Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com>
AuthorDate: Fri Nov 10 16:21:55 2023 +0800

    [FLINK-33206] Verify the existence of HBase table before read/write. This 
closes #22
    
    Co-authored-by: Tan-JiaLiang <tanjialiang1...@gmail.com>
---
 .../hbase1/source/HBaseRowDataInputFormat.java     | 11 +++--
 .../connector/hbase1/HBaseConnectorITCase.java     | 48 ++++++++++++++++++++++
 .../flink/connector/hbase1/util/HBaseTestBase.java |  1 +
 .../hbase2/source/HBaseRowDataInputFormat.java     | 18 ++++----
 .../connector/hbase2/HBaseConnectorITCase.java     | 47 +++++++++++++++++++++
 .../flink/connector/hbase2/util/HBaseTestBase.java |  1 +
 .../connector/hbase/sink/HBaseSinkFunction.java    |  9 +++-
 7 files changed, 117 insertions(+), 18 deletions(-)

diff --git 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
index d48c2fa..21ce8a4 100644
--- 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
+++ 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
@@ -86,12 +86,11 @@ public class HBaseRowDataInputFormat extends 
AbstractTableInputFormat<RowData> {
     }
 
     private void connectToTable() throws IOException {
-        try {
-            connection = 
ConnectionFactory.createConnection(getHadoopConfiguration());
-            table = (HTable) connection.getTable(TableName.valueOf(tableName));
-        } catch (TableNotFoundException tnfe) {
-            LOG.error("The table " + tableName + " not found ", tnfe);
-            throw new RuntimeException("HBase table '" + tableName + "' not 
found.", tnfe);
+        connection = 
ConnectionFactory.createConnection(getHadoopConfiguration());
+        TableName name = TableName.valueOf(tableName);
+        if (!connection.getAdmin().tableExists(name)) {
+            throw new TableNotFoundException("HBase table '" + tableName + "' 
not found.");
         }
+        table = (HTable) connection.getTable(name);
     }
 }
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 0ea0002..399238d 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -21,6 +21,10 @@ package org.apache.flink.connector.hbase1;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
+import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase1.source.AbstractTableInputFormat;
 import org.apache.flink.connector.hbase1.source.HBaseRowDataInputFormat;
@@ -31,6 +35,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -38,7 +43,9 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 
@@ -52,6 +59,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -493,6 +501,46 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         assertNull(inputFormat.getConnection());
     }
 
+    @Test
+    public void testTableInputFormatTableExistence() throws IOException {
+        HBaseTableSchema tableSchema = new HBaseTableSchema();
+        tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+        AbstractTableInputFormat<?> inputFormat =
+                new HBaseRowDataInputFormat(getConf(), TEST_NOT_EXISTS_TABLE, 
tableSchema, "null");
+
+        assertThatThrownBy(() -> inputFormat.createInputSplits(1))
+                .isExactlyInstanceOf(TableNotFoundException.class);
+
+        inputFormat.close();
+        assertNull(inputFormat.getConnection());
+    }
+
+    @Test
+    public void testHBaseSinkFunctionTableExistence() throws Exception {
+        org.apache.hadoop.conf.Configuration hbaseConf =
+                HBaseConfigurationUtil.getHBaseConfiguration();
+        hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum());
+        hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
+
+        HBaseTableSchema tableSchema = new HBaseTableSchema();
+        tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+
+        HBaseSinkFunction<RowData> sinkFunction =
+                new HBaseSinkFunction<>(
+                        TEST_NOT_EXISTS_TABLE,
+                        hbaseConf,
+                        new RowDataToMutationConverter(tableSchema, "null", 
false),
+                        2 * 1024 * 1024,
+                        1000,
+                        1000);
+
+        assertThatThrownBy(() -> sinkFunction.open(new Configuration()))
+                .getRootCause()
+                .isExactlyInstanceOf(TableNotFoundException.class);
+
+        sinkFunction.close();
+    }
+
     // 
-------------------------------------------------------------------------------------
     // HBase lookup source tests
     // 
-------------------------------------------------------------------------------------
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index 86110c8..1576f80 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
+    protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
     protected static final String ROW_KEY = "rowkey";
 
diff --git 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
index 1bd9a6a..f74eb85 100644
--- 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
+++ 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
@@ -85,16 +85,14 @@ public class HBaseRowDataInputFormat extends 
AbstractTableInputFormat<RowData> {
     }
 
     private void connectToTable() throws IOException {
-        try {
-            if (connection == null) {
-                connection = 
ConnectionFactory.createConnection(getHadoopConfiguration());
-            }
-            TableName name = TableName.valueOf(getTableName());
-            table = connection.getTable(name);
-            regionLocator = connection.getRegionLocator(name);
-        } catch (TableNotFoundException tnfe) {
-            LOG.error("The table " + tableName + " not found ", tnfe);
-            throw new RuntimeException("HBase table '" + tableName + "' not 
found.", tnfe);
+        if (connection == null) {
+            connection = 
ConnectionFactory.createConnection(getHadoopConfiguration());
         }
+        TableName name = TableName.valueOf(tableName);
+        if (!connection.getAdmin().tableExists(name)) {
+            throw new TableNotFoundException("HBase table '" + tableName + "' 
not found.");
+        }
+        table = connection.getTable(name);
+        regionLocator = connection.getRegionLocator(name);
     }
 }
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index 64a9875..4ebc50f 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
+import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase2.source.AbstractTableInputFormat;
 import org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat;
@@ -33,6 +36,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -41,7 +45,9 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -59,6 +65,7 @@ import java.util.stream.StreamSupport;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -478,6 +485,46 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         assertNull(inputFormat.getConnection());
     }
 
+    @Test
+    public void testTableInputFormatTableExistence() throws IOException {
+        HBaseTableSchema tableSchema = new HBaseTableSchema();
+        tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+        AbstractTableInputFormat<?> inputFormat =
+                new HBaseRowDataInputFormat(getConf(), TEST_NOT_EXISTS_TABLE, 
tableSchema, "null");
+
+        assertThatThrownBy(() -> inputFormat.createInputSplits(1))
+                .isExactlyInstanceOf(TableNotFoundException.class);
+
+        inputFormat.close();
+        assertNull(inputFormat.getConnection());
+    }
+
+    @Test
+    public void testHBaseSinkFunctionTableExistence() throws Exception {
+        org.apache.hadoop.conf.Configuration hbaseConf =
+                HBaseConfigurationUtil.getHBaseConfiguration();
+        hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum());
+        hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
+
+        HBaseTableSchema tableSchema = new HBaseTableSchema();
+        tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+
+        HBaseSinkFunction<RowData> sinkFunction =
+                new HBaseSinkFunction<>(
+                        TEST_NOT_EXISTS_TABLE,
+                        hbaseConf,
+                        new RowDataToMutationConverter(tableSchema, "null", 
false),
+                        2 * 1024 * 1024,
+                        1000,
+                        1000);
+
+        assertThatThrownBy(() -> sinkFunction.open(new Configuration()))
+                .getRootCause()
+                .isExactlyInstanceOf(TableNotFoundException.class);
+
+        sinkFunction.close();
+    }
+
     private void verifyHBaseLookupJoin(boolean async) {
         StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1301ee1..621143d 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
+    protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
     protected static final String ROW_KEY = "rowkey";
 
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 0c4de1a..0ffad05 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -120,9 +120,14 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             if (null == connection) {
                 this.connection = ConnectionFactory.createConnection(config);
             }
+
+            TableName tableName = TableName.valueOf(hTableName);
+            if (!connection.getAdmin().tableExists(tableName)) {
+                throw new TableNotFoundException(tableName);
+            }
+
             // create a parameter instance, set the table name and custom 
listener reference.
-            BufferedMutatorParams params =
-                    new 
BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
+            BufferedMutatorParams params = new 
BufferedMutatorParams(tableName).listener(this);
             if (bufferFlushMaxSizeInBytes > 0) {
                 params.writeBufferSize(bufferFlushMaxSizeInBytes);
             }

Reply via email to