This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch revert-6-bugfix_30360 in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
commit 8fa4b1dc8351edda3454bb55ff1fca54c45d8239 Author: MartijnVisser <mart...@2symbols.com> AuthorDate: Fri Apr 28 13:33:36 2023 +0200 Revert "[FLINK-30360][Connectors/HBase] When specified the partial columns, ignore all empty columns" This reverts commit affc2dbbdb0ef1ef8a050a29944fb62292699dba. --- .../connector/hbase1/HBaseConnectorITCase.java | 55 ---------------------- .../flink/connector/hbase1/util/HBaseTestBase.java | 8 ---- .../connector/hbase2/HBaseConnectorITCase.java | 55 ---------------------- .../flink/connector/hbase2/util/HBaseTestBase.java | 8 ---- .../flink/connector/hbase/util/HBaseSerde.java | 3 -- 5 files changed, 129 deletions(-) diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java index 38046bd..1ad2dd2 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java @@ -280,61 +280,6 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, expected); } - @Test - public void testPartialColumnTableSink() throws Exception { - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); - - // register HBase table testTable1 which contains test data - String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false); - tEnv.executeSql(table1DDL); - - String table2DDL = createHBaseTableDDL(TEST_TABLE_4, false); - tEnv.executeSql(table2DDL); - - String query = - "INSERT INTO " - + TEST_TABLE_4 - + " (rowkey, family3) " - + " SELECT" - + " rowkey," - + " family3" - + " FROM " - + TEST_TABLE_1; - - tEnv.executeSql(query).await(); - - // start a batch scan job to verify contents in HBase table - TableEnvironment batchEnv = TableEnvironment.create(batchSettings); - batchEnv.executeSql(table2DDL); - - Table table = - batchEnv.sqlQuery( - "SELECT " - + " h.rowkey, " - + " h.family1.col1, " - + " h.family2.col1, " - + " h.family2.col2, " - + " h.family3.col1, " - + " h.family3.col2, " - + " h.family3.col3 " - + "FROM " - + TEST_TABLE_4 - + " AS h"); - List<Row> results = CollectionUtil.iteratorToList(table.execute().collect()); - String expected = - "+I[1, null, null, null, 1.01, false, Welt-1]\n" - + "+I[2, null, null, null, 2.02, true, Welt-2]\n" - + "+I[3, null, null, null, 3.03, false, Welt-3]\n" - + "+I[4, null, null, null, 4.04, true, Welt-4]\n" - + "+I[5, null, null, null, 5.05, false, Welt-5]\n" - + "+I[6, null, null, null, 6.06, true, Welt-6]\n" - + "+I[7, null, null, null, 7.07, false, Welt-7]\n" - + "+I[8, null, null, null, 8.08, true, Welt-8]\n"; - - TestBaseUtils.compareResultAsText(results, expected); - } - @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java index 7a589d4..cc2de79 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java @@ -43,7 +43,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_1 = "testTable1"; protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; - protected static final String TEST_TABLE_4 = "testTable4"; protected static final String ROW_KEY = "rowkey"; @@ -93,7 +92,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable1(); createHBaseTable2(); createHBaseTable3(); - createHBaseTable4(); } private static void createHBaseTable1() throws IOException { @@ -234,12 +232,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } - private static void createHBaseTable4() { - // create a table - TableName tableName = TableName.valueOf(TEST_TABLE_4); - createTable(tableName, FAMILIES, SPLIT_KEYS); - } - private static Put putRow( int rowKey, int f1c1, diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index 572b876..e75d0d0 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -311,61 +311,6 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, String.join("", expected)); } - @Test - public void testPartialColumnTableSink() throws Exception { - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); - - // register HBase table testTable1 which contains test data - String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false); - tEnv.executeSql(table1DDL); - - String table2DDL = createHBaseTableDDL(TEST_TABLE_4, false); - tEnv.executeSql(table2DDL); - - String query = - "INSERT INTO " - + TEST_TABLE_4 - + " (rowkey, family3) " - + " SELECT" - + " rowkey," - + " family3" - + " FROM " - + TEST_TABLE_1; - - tEnv.executeSql(query).await(); - - // start a batch scan job to verify contents in HBase table - TableEnvironment batchEnv = TableEnvironment.create(batchSettings); - batchEnv.executeSql(table2DDL); - - Table table = - batchEnv.sqlQuery( - "SELECT " - + " h.rowkey, " - + " h.family1.col1, " - + " h.family2.col1, " - + " h.family2.col2, " - + " h.family3.col1, " - + " h.family3.col2, " - + " h.family3.col3 " - + "FROM " - + TEST_TABLE_4 - + " AS h"); - List<Row> results = CollectionUtil.iteratorToList(table.execute().collect()); - String expected = - "+I[1, null, null, null, 1.01, false, Welt-1]\n" - + "+I[2, null, null, null, 2.02, true, Welt-2]\n" - + "+I[3, null, null, null, 3.03, false, Welt-3]\n" - + "+I[4, null, null, null, 4.04, true, Welt-4]\n" - + "+I[5, null, null, null, 5.05, false, Welt-5]\n" - + "+I[6, null, null, null, 6.06, true, Welt-6]\n" - + "+I[7, null, null, null, 7.07, false, Welt-7]\n" - + "+I[8, null, null, null, 8.08, true, Welt-8]\n"; - - TestBaseUtils.compareResultAsText(results, expected); - } - @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index 331aabf..1e639ba 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -43,7 +43,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_1 = "testTable1"; protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; - protected static final String TEST_TABLE_4 = "testTable4"; protected static final String ROW_KEY = "rowkey"; @@ -93,7 +92,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable1(); createHBaseTable2(); createHBaseTable3(); - createHBaseTable4(); } private static void createHBaseTable1() throws IOException { @@ -234,12 +232,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } - private static void createHBaseTable4() { - // create a table - TableName tableName = TableName.valueOf(TEST_TABLE_4); - createTable(tableName, FAMILIES, SPLIT_KEYS); - } - private static Put putRow( int rowKey, int f1c1, diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java index a8863ff..d21cc4a 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java @@ -140,9 +140,6 @@ public class HBaseSerde { // get family key byte[] familyKey = families[f]; RowData familyRow = row.getRow(i, qualifiers[f].length); - if (familyRow == null) { - continue; - } for (int q = 0; q < this.qualifiers[f].length; q++) { // get quantifier key byte[] qualifier = qualifiers[f][q];