This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 9e20929 [FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData 9e20929 is described below commit 9e20929dbda43959950509e2f4534c285bf48f3b Author: Leonard Xu <xbjt...@163.com> AuthorDate: Wed Jun 17 21:14:25 2020 +0800 [FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData This closes #12691 --- .../hbase/source/HBaseDynamicTableSource.java | 2 +- .../hbase/source/HBaseLookupFunction.java | 2 +- ...nction.java => HBaseRowDataLookupFunction.java} | 46 +-- .../flink/connector/hbase/util/HBaseSerde.java | 26 ++ .../connector/hbase/HBaseConnectorITCase.java | 423 ++++++++------------- .../hbase/HBaseDynamicTableFactoryTest.java | 6 +- .../flink/connector/hbase/util/HBaseTestBase.java | 68 +++- 7 files changed, 278 insertions(+), 295 deletions(-) diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java index 1dac67ae..537d4b2 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java @@ -77,7 +77,7 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour .isPresent(), "Currently, HBase table only supports lookup by rowkey field."); - return TableFunctionProvider.of(new HBaseLookupFunction(conf, tableName, hbaseSchema)); + return TableFunctionProvider.of(new HBaseRowDataLookupFunction(conf, tableName, hbaseSchema, nullStringLiteral)); } @Override diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java index 1d608e9..c84cbe1 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java @@ -45,7 +45,7 @@ import java.io.IOException; /** * The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI - * and also useful for temporal table join plan in SQL. + * and also useful for temporal table join plan in SQL. It looks up the result as {@link Row}. */ @Internal public class HBaseLookupFunction extends TableFunction<Row> { diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java similarity index 81% copy from flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java copy to flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java index 1d608e9..7facf58 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java @@ -20,13 +20,12 @@ package org.apache.flink.connector.hbase.source; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; -import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper; +import org.apache.flink.connector.hbase.util.HBaseSerde; import org.apache.flink.connector.hbase.util.HBaseTableSchema; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.types.Row; import org.apache.flink.util.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.slf4j.Logger; @@ -44,29 +44,33 @@ import org.slf4j.LoggerFactory; import java.io.IOException; /** - * The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI - * and also useful for temporal table join plan in SQL. + * The HBaseRowDataLookupFunction is a standard user-defined table function, it can be used in tableAPI + * and also useful for temporal table join plan in SQL. It looks up the result as {@link RowData}. */ @Internal -public class HBaseLookupFunction extends TableFunction<Row> { - private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class); +public class HBaseRowDataLookupFunction extends TableFunction<RowData> { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataLookupFunction.class); private static final long serialVersionUID = 1L; private final String hTableName; private final byte[] serializedConfig; private final HBaseTableSchema hbaseTableSchema; + private final String nullStringLiteral; - private transient HBaseReadWriteHelper readHelper; private transient Connection hConnection; private transient HTable table; + private transient HBaseSerde serde; - public HBaseLookupFunction( + public HBaseRowDataLookupFunction( Configuration configuration, String hTableName, - HBaseTableSchema hbaseTableSchema) { + HBaseTableSchema hbaseTableSchema, + String nullStringLiteral) { this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration); this.hTableName = hTableName; this.hbaseTableSchema = hbaseTableSchema; + this.nullStringLiteral = nullStringLiteral; } /** @@ -75,19 +79,17 @@ public class HBaseLookupFunction extends TableFunction<Row> { */ public void eval(Object rowKey) throws IOException { // fetch result - Result result = table.get(readHelper.createGet(rowKey)); - if (!result.isEmpty()) { - // parse and collect - collect(readHelper.parseToRow(result, rowKey)); + Get get = serde.createGet(rowKey); + if (get != null) { + Result result = table.get(get); + if (!result.isEmpty()) { + // parse and collect + collect(serde.convertToRow(result)); + } } } - @Override - public TypeInformation<Row> getResultType() { - return hbaseTableSchema.convertsToTableSchema().toRowType(); - } - - private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() { + private Configuration prepareRuntimeConfiguration() { // create default configuration from current runtime env (`hbase-site.xml` in classpath) first, // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath). // user params from client-side have the highest priority @@ -107,7 +109,7 @@ public class HBaseLookupFunction extends TableFunction<Row> { @Override public void open(FunctionContext context) { LOG.info("start open ..."); - org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration(); + Configuration config = prepareRuntimeConfiguration(); try { hConnection = ConnectionFactory.createConnection(config); table = (HTable) hConnection.getTable(TableName.valueOf(hTableName)); @@ -118,7 +120,7 @@ public class HBaseLookupFunction extends TableFunction<Row> { LOG.error("Exception while creating connection to HBase.", ioe); throw new RuntimeException("Cannot create connection to HBase.", ioe); } - this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema); + this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral); LOG.info("end open."); } diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java index 693cb62..f888914 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java @@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -76,6 +77,7 @@ public class HBaseSerde { private final @Nullable FieldDecoder keyDecoder; private final FieldEncoder[][] qualifierEncoders; private final FieldDecoder[][] qualifierDecoders; + private final GenericRowData rowWithRowKey; public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) { this.families = hbaseSchema.getFamilyKeys(); @@ -115,6 +117,7 @@ public class HBaseSerde { .toArray(FieldDecoder[]::new); this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length); } + this.rowWithRowKey = new GenericRowData(1); } /** @@ -196,6 +199,29 @@ public class HBaseSerde { } /** + * Returns an instance of Get that retrieves the matches records from the HBase table. + * + * @return The appropriate instance of Get for this use case. + */ + public Get createGet(Object rowKey) { + checkArgument(keyEncoder != null, "row key is not set."); + rowWithRowKey.setField(0, rowKey); + byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0); + if (rowkey.length == 0) { + // drop dirty records, rowkey shouldn't be zero length + return null; + } + Get get = new Get(rowkey); + for (int f = 0; f < families.length; f++) { + byte[] family = families[f]; + for (byte[] qualifier : qualifiers[f]) { + get.addColumn(family, qualifier); + } + } + return get; + } + + /** * Converts HBase {@link Result} into {@link RowData}. */ public RowData convertToRow(Result result) { diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java index 3777dec..9ab36cc 100644 --- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java +++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java @@ -27,7 +27,6 @@ import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.connector.hbase.source.HBaseInputFormat; import org.apache.flink.connector.hbase.source.HBaseTableSource; -import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase.util.HBaseTestBase; import org.apache.flink.connector.hbase.util.PlannerType; import org.apache.flink.streaming.api.datastream.DataStream; @@ -40,20 +39,17 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.api.internal.TableImpl; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil; import org.apache.flink.table.planner.runtime.utils.TableEnvUtil; import org.apache.flink.table.planner.sinks.CollectRowTableSink; import org.apache.flink.table.planner.sinks.CollectTableSink; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.runtime.utils.StreamITCase; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.sources.TableSource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -61,26 +57,15 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.stream.Collectors; import scala.Option; import static org.apache.flink.connector.hbase.util.PlannerType.OLD_PLANNER; import static org.apache.flink.table.api.Expressions.$; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.junit.Assert.assertEquals; /** @@ -124,7 +109,7 @@ public class HBaseConnectorITCase extends HBaseTestBase { hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); hbaseTable.addColumn(FAMILY3, F3COL3, String.class); - hbaseTable.setRowKey("rowkey", Integer.class); + hbaseTable.setRowKey(ROW_KEY, Integer.class); ((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable); } else { tEnv.executeSql( @@ -176,7 +161,7 @@ public class HBaseConnectorITCase extends HBaseTestBase { hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); hbaseTable.addColumn(FAMILY3, F3COL3, String.class); - hbaseTable.setRowKey("rowkey", Integer.class); + hbaseTable.setRowKey(ROW_KEY, Integer.class); ((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable); } else { tEnv.executeSql( @@ -221,7 +206,7 @@ public class HBaseConnectorITCase extends HBaseTestBase { if (isLegacyConnector) { HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1); // shuffle order of column registration - hbaseTable.setRowKey("rowkey", Integer.class); + hbaseTable.setRowKey(ROW_KEY, Integer.class); hbaseTable.addColumn(FAMILY2, F2COL1, String.class); hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class); @@ -268,7 +253,7 @@ public class HBaseConnectorITCase extends HBaseTestBase { HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1); hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class); hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class); - hbaseTable.setRowKey("rowkey", Integer.class); + hbaseTable.setRowKey(ROW_KEY, Integer.class); ((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable); } else { tEnv.executeSql( @@ -319,115 +304,33 @@ public class HBaseConnectorITCase extends HBaseTestBase { assertEquals(360, (int) resultSet.get(0).f0); } - // ------------------------------------------------------------------------------------- - // HBaseTableSink tests - // ------------------------------------------------------------------------------------- - - // prepare a source collection. - private static final List<Row> testData1 = new ArrayList<>(); - private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo( - new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, - Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIME}, - new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3", "f4c1", "f4c2", "f4c3"}); - - static { - testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1", - Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00"))); - testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2", - Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00"))); - testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3", - Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00"))); - testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4", - Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00"))); - testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5", - Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00"))); - testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6", - Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00"))); - testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7", - Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00"))); - testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8", - Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00"))); - } - @Test public void testTableSink() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); - if (isLegacyConnector) { - HBaseTableSchema schema = new HBaseTableSchema(); - schema.addColumn(FAMILY1, F1COL1, Integer.class); - schema.addColumn(FAMILY2, F2COL1, String.class); - schema.addColumn(FAMILY2, F2COL2, Long.class); - schema.setRowKey("rk", Integer.class); - schema.addColumn(FAMILY3, F3COL1, Double.class); - schema.addColumn(FAMILY3, F3COL2, Boolean.class); - schema.addColumn(FAMILY3, F3COL3, String.class); - - Map<String, String> tableProperties = new HashMap<>(); - tableProperties.put("connector.type", "hbase"); - tableProperties.put("connector.version", "1.4.3"); - tableProperties.put("connector.property-version", "1"); - tableProperties.put("connector.table-name", TEST_TABLE_2); - tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum()); - tableProperties.put("connector.zookeeper.znode.parent", "/hbase"); - DescriptorProperties descriptorProperties = new DescriptorProperties(true); - descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema()); - descriptorProperties.putProperties(tableProperties); - TableSink tableSink = TableFactoryService - .find(HBaseTableFactory.class, descriptorProperties.asMap()) - .createTableSink(descriptorProperties.asMap()); - ((TableEnvironmentInternal) tEnv).registerTableSinkInternal("hbase", tableSink); - } else { - tEnv.executeSql( - "CREATE TABLE hbase (" + - " family1 ROW<col1 INT>," + - " family2 ROW<col1 STRING, col2 BIGINT>," + - " rk INT," + - " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" + - ") WITH (" + - " 'connector' = 'hbase-1.4'," + - " 'table-name' = '" + TEST_TABLE_1 + "'," + - " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'," + - " 'zookeeper.znode.parent' = '/hbase'" + - ")"); - } + // register HBase table testTable1 which contains test data + String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false); + tEnv.executeSql(table1DDL); + + String table2DDL = createHBaseTableDDL(TEST_TABLE_2, false); + tEnv.executeSql(table2DDL); - DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1); - tEnv.createTemporaryView("src", ds); + String query = "INSERT INTO " + TEST_TABLE_2 + " SELECT" + + " rowkey," + + " family1," + + " family2," + + " family3" + + " FROM " + TEST_TABLE_1; - String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src"; + // wait to finish TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query); // start a batch scan job to verify contents in HBase table - // start a batch scan job to verify contents in HBase table - TableEnvironment batchTableEnv = createBatchTableEnv(); - - if (isLegacyConnector) { - HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_2); - hbaseTable.setRowKey("rowkey", Integer.class); - hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class); - hbaseTable.addColumn(FAMILY2, F2COL1, String.class); - hbaseTable.addColumn(FAMILY2, F2COL2, Long.class); - hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); - hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); - hbaseTable.addColumn(FAMILY3, F3COL3, String.class); - ((TableEnvironmentInternal) batchTableEnv).registerTableSourceInternal("hTable", hbaseTable); - } else { - batchTableEnv.executeSql( - "CREATE TABLE hTable (" + - " rowkey INT," + - " family1 ROW<col1 INT>," + - " family2 ROW<col1 STRING, col2 BIGINT>," + - " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" + - ") WITH (" + - " 'connector' = 'hbase-1.4'," + - " 'table-name' = '" + TEST_TABLE_1 + "'," + - " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" + - ")"); - } + TableEnvironment batchEnv = createBatchTableEnv(); + batchEnv.executeSql(table2DDL); - Table table = batchTableEnv.sqlQuery( + Table table = batchEnv.sqlQuery( "SELECT " + " h.rowkey, " + " h.family1.col1, " + @@ -436,9 +339,8 @@ public class HBaseConnectorITCase extends HBaseTestBase { " h.family3.col1, " + " h.family3.col2, " + " h.family3.col3 " + - "FROM hTable AS h" + "FROM " + TEST_TABLE_2 + " AS h" ); - List<Row> results = collectBatchResult(table); String expected = "1,10,Hello-1,100,1.01,false,Welt-1\n" + @@ -455,56 +357,37 @@ public class HBaseConnectorITCase extends HBaseTestBase { @Test public void testTableSourceSinkWithDDL() throws Exception { + if (OLD_PLANNER.equals(planner) || isLegacyConnector) { + // only test for blink planner and new connector, because types TIMESTAMP/DATE/TIME/DECIMAL works well in + // new connector(using blink-planner), but exits some precision problem in old planner or legacy connector. + return; + } + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); - DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1); - tEnv.createTemporaryView("src", ds); + // regiter HBase table testTable1 which contains test data + String table1DDL = createHBaseTableDDL(TEST_TABLE_1, true); + tEnv.executeSql(table1DDL); - // register hbase table - String quorum = getZookeeperQuorum(); - String ddl; - if (isLegacyConnector) { - ddl = "CREATE TABLE hbase (\n" + - " rowkey INT," + - " family1 ROW<col1 INT>,\n" + - " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" + - " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" + - " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" + - ") WITH (\n" + - " 'connector.type' = 'hbase',\n" + - " 'connector.version' = '1.4.3',\n" + - " 'connector.table-name' = 'testTable3',\n" + - " 'connector.zookeeper.quorum' = '" + quorum + "',\n" + - " 'connector.zookeeper.znode.parent' = '/hbase' " + - ")"; - } else { - ddl = "CREATE TABLE hbase (\n" + - " rowkey INT," + - " family1 ROW<col1 INT>,\n" + - " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" + - " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" + - " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" + - ") WITH (\n" + - " 'connector' = 'hbase-1.4',\n" + - " 'table-name' = 'testTable3',\n" + - " 'zookeeper.quorum' = '" + quorum + "',\n" + - " 'zookeeper.znode.parent' = '/hbase' " + - ")"; - } - tEnv.executeSql(ddl); + // register HBase table which is empty + String table3DDL = createHBaseTableDDL(TEST_TABLE_3, true); + tEnv.executeSql(table3DDL); - String query = "INSERT INTO hbase " + - "SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " + - "FROM src"; - TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query); + String insertStatement = "INSERT INTO " + TEST_TABLE_3 + + " SELECT rowkey," + + " family1," + + " family2," + + " family3," + + " family4" + + " from " + TEST_TABLE_1; + // wait to finish + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, insertStatement); // start a batch scan job to verify contents in HBase table - TableEnvironment batchTableEnv = createBatchTableEnv(); - batchTableEnv.executeSql(ddl); - - Table table = batchTableEnv.sqlQuery( - "SELECT " + + TableEnvironment batchEnv = createBatchTableEnv(); + batchEnv.executeSql(table3DDL); + String query = "SELECT " + " h.rowkey, " + " h.family1.col1, " + " h.family2.col1, " + @@ -514,115 +397,103 @@ public class HBaseConnectorITCase extends HBaseTestBase { " h.family3.col3, " + " h.family4.col1, " + " h.family4.col2, " + - " h.family4.col3 " + - "FROM hbase AS h" - ); - - List<Row> results = collectBatchResult(table); - String expected = - "1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 19:00:00.0,2019-08-18,19:00:00\n" + - "2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 19:01:00.0,2019-08-18,19:01:00\n" + - "3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 19:02:00.0,2019-08-18,19:02:00\n" + - "4,40,null,400,4.04,true,Welt-4,2019-08-18 19:03:00.0,2019-08-18,19:03:00\n" + - "5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 19:10:00.0,2019-08-19,19:10:00\n" + - "6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 19:20:00.0,2019-08-19,19:20:00\n" + - "7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 19:30:00.0,2019-08-19,19:30:00\n" + - "8,80,null,800,8.08,true,Welt-8,2019-08-19 19:40:00.0,2019-08-19,19:40:00\n"; - - TestBaseUtils.compareResultAsText(results, expected); - } - + " h.family4.col3, " + + " h.family4.col4 " + + " FROM " + TEST_TABLE_3 + " AS h"; + Iterator<Row> collected = tEnv.executeSql(query).collect(); + List<String> result = Lists.newArrayList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); - // ------------------------------------------------------------------------------------- - // HBase lookup source tests - // ------------------------------------------------------------------------------------- - - // prepare a source collection. - private static final List<Row> testData2 = new ArrayList<>(); - private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo( - new TypeInformation[]{Types.INT, Types.LONG, Types.STRING}, - new String[]{"a", "b", "c"}); - - static { - testData2.add(Row.of(1, 1L, "Hi")); - testData2.add(Row.of(2, 2L, "Hello")); - testData2.add(Row.of(3, 2L, "Hello world")); - testData2.add(Row.of(3, 3L, "Hello world!")); + List<String> expected = new ArrayList<>(); + expected.add("1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001"); + expected.add("2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002"); + expected.add("3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003"); + expected.add("4,40,null,400,4.04,true,Welt-4,2019-08-18T19:03,2019-08-18,19:03,12345678.0004"); + expected.add("5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19T19:10,2019-08-19,19:10,12345678.0005"); + expected.add("6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19T19:20,2019-08-19,19:20,12345678.0006"); + expected.add("7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19T19:30,2019-08-19,19:30,12345678.0007"); + expected.add("8,80,null,800,8.08,true,Welt-8,2019-08-19T19:40,2019-08-19,19:40,12345678.0008"); + assertEquals(expected, result); } @Test public void testHBaseLookupTableSource() throws Exception { - if (OLD_PLANNER.equals(planner)) { + if (OLD_PLANNER.equals(planner) || isLegacyConnector) { // lookup table source is only supported in blink planner, skip for old planner + // types TIMESTAMP/DATE/TIME/DECIMAL works well in new connector, skip legacy connector return; } - StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings); - StreamITCase.clear(); + + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + tEnv.executeSql( + "CREATE TABLE " + TEST_TABLE_1 + " (" + + " family1 ROW<col1 INT>," + + " family2 ROW<col1 STRING, col2 BIGINT>," + + " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>," + + " rowkey INT," + + " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3), col4 DECIMAL(12, 4)>," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = '" + TEST_TABLE_1 + "'," + + " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" + + ")"); // prepare a source table String srcTableName = "src"; - DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2); - Table in = streamTableEnv.fromDataStream(ds, $("a"), $("b"), $("c"), $("proc").proctime()); - streamTableEnv.registerTable(srcTableName, in); + DataStream<Row> srcDs = execEnv.fromCollection(testData).returns(testTypeInfo); + Table in = tEnv.fromDataStream(srcDs, $("a"), $("b"), $("c"), $("proc").proctime()); + tEnv.registerTable(srcTableName, in); - if (isLegacyConnector) { - Map<String, String> tableProperties = hbaseTableProperties(); - TableSource<?> source = TableFactoryService - .find(HBaseTableFactory.class, tableProperties) - .createTableSource(tableProperties); - ((TableEnvironmentInternal) streamTableEnv).registerTableSourceInternal("hbaseLookup", source); - } else { - streamTableEnv.executeSql( - "CREATE TABLE hbaseLookup (" + - " family1 ROW<col1 INT>," + - " rk INT," + - " family2 ROW<col1 STRING, col2 BIGINT>," + - " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" + - ") WITH (" + - " 'connector' = 'hbase-1.4'," + - " 'table-name' = '" + TEST_TABLE_1 + "'," + - " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" + - ")"); - } // perform a temporal table join query - String query = "SELECT a,family1.col1, family3.col3 FROM src " + - "JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rk"; - Table result = streamTableEnv.sqlQuery(query); - - DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class); - resultSet.addSink(new StreamITCase.StringSink<>()); - - streamEnv.execute(); + String dimJoinQuery = "SELECT" + + " a," + + " b," + + " h.family1.col1," + + " h.family2.col1," + + " h.family2.col2," + + " h.family3.col1," + + " h.family3.col2," + + " h.family3.col3," + + " h.family4.col1," + + " h.family4.col2," + + " h.family4.col3," + + " h.family4.col4 " + + " FROM src JOIN " + TEST_TABLE_1 + " FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rowkey"; + Iterator<Row> collected = tEnv.executeSql(dimJoinQuery).collect(); + List<String> result = Lists.newArrayList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); List<String> expected = new ArrayList<>(); - expected.add("1,10,Welt-1"); - expected.add("2,20,Welt-2"); - expected.add("3,30,Welt-3"); - expected.add("3,30,Welt-3"); + expected.add("1,1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001"); + expected.add("2,2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002"); + expected.add("3,2,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003"); + expected.add("3,3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003"); - StreamITCase.compareWithList(expected); + assertEquals(expected, result); } - private static Map<String, String> hbaseTableProperties() { - Map<String, String> properties = new HashMap<>(); - properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE); - properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143); - properties.put(CONNECTOR_PROPERTY_VERSION, "1"); - properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1); - properties.put(CONNECTOR_ZK_QUORUM, getZookeeperQuorum()); - // schema - String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3}; - TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT); - TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG); - TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING); - TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3}; - - DescriptorProperties descriptorProperties = new DescriptorProperties(true); - TableSchema tableSchema = new TableSchema(columnNames, columnTypes); - descriptorProperties.putTableSchema(SCHEMA, tableSchema); - descriptorProperties.putProperties(properties); - return descriptorProperties.asMap(); + // ------------------------------------------------------------------------------------- + // HBase lookup source tests + // ------------------------------------------------------------------------------------- + + // prepare a source collection. + private static final List<Row> testData = new ArrayList<>(); + private static final RowTypeInfo testTypeInfo = new RowTypeInfo( + new TypeInformation[]{Types.INT, Types.LONG, Types.STRING}, + new String[]{"a", "b", "c"}); + + static { + testData.add(Row.of(1, 1L, "Hi")); + testData.add(Row.of(2, 2L, "Hello")); + testData.add(Row.of(3, 2L, "Hello world")); + testData.add(Row.of(3, 3L, "Hello world!")); } // ------------------------------- Utilities ------------------------------------------------- @@ -721,4 +592,42 @@ public class HBaseConnectorITCase extends HBaseTestBase { } } + private String createHBaseTableDDL(String tableName, boolean testTimeAndDecimalTypes) { + StringBuilder family4Statement = new StringBuilder(); + if (testTimeAndDecimalTypes) { + family4Statement.append(", family4 ROW<col1 TIMESTAMP(3)"); + family4Statement.append(", col2 DATE"); + family4Statement.append(", col3 TIME(3)"); + family4Statement.append(", col4 DECIMAL(12, 4)"); + family4Statement.append("> \n"); + } + if (isLegacyConnector) { + return "CREATE TABLE " + tableName + "(\n" + + " rowkey INT,\n" + + " family1 ROW<col1 INT>,\n" + + " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" + + " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>" + + family4Statement.toString() + + ") WITH (\n" + + " 'connector.type' = 'hbase',\n" + + " 'connector.version' = '1.4.3',\n" + + " 'connector.table-name' = '" + tableName + "',\n" + + " 'connector.zookeeper.quorum' = '" + getZookeeperQuorum() + "',\n" + + " 'connector.zookeeper.znode.parent' = '/hbase' " + + ")"; + } else { + return "CREATE TABLE " + tableName + "(\n" + + " rowkey INT," + + " family1 ROW<col1 INT>,\n" + + " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" + + " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>" + + family4Statement.toString() + + ") WITH (\n" + + " 'connector' = 'hbase-1.4',\n" + + " 'table-name' = '" + tableName + "',\n" + + " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "',\n" + + " 'zookeeper.znode.parent' = '/hbase' " + + ")"; + } + } } diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java index 061114e..47775ac 100644 --- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java @@ -24,7 +24,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions; import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.sink.HBaseDynamicTableSink; import org.apache.flink.connector.hbase.source.HBaseDynamicTableSource; -import org.apache.flink.connector.hbase.source.HBaseLookupFunction; +import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTableImpl; @@ -111,8 +111,8 @@ public class HBaseDynamicTableFactoryTest { assertTrue(lookupProvider instanceof TableFunctionProvider); TableFunction tableFunction = ((TableFunctionProvider) lookupProvider).createTableFunction(); - assertTrue(tableFunction instanceof HBaseLookupFunction); - assertEquals("testHBastTable", ((HBaseLookupFunction) tableFunction).getHTableName()); + assertTrue(tableFunction instanceof HBaseRowDataLookupFunction); + assertEquals("testHBastTable", ((HBaseRowDataLookupFunction) tableFunction).getHTableName()); HBaseTableSchema hbaseSchema = hbaseSource.getHBaseTableSchema(); assertEquals(2, hbaseSchema.getRowKeyIndex()); diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java index e00b986..c376b95 100644 --- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java +++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java @@ -28,9 +28,17 @@ import org.junit.Before; import org.junit.BeforeClass; import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; +import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal; +import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timeToInternal; +import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToInternal; + /** * Abstract IT case class for HBase. */ @@ -40,7 +48,8 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; - protected static final String ROWKEY = "rk"; + protected static final String ROW_KEY = "rowkey"; + protected static final String FAMILY1 = "family1"; protected static final String F1COL1 = "col1"; @@ -54,11 +63,16 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String F3COL3 = "col3"; protected static final String FAMILY4 = "family4"; + protected static final String F4COL1 = "col1"; + protected static final String F4COL2 = "col2"; + protected static final String F4COL3 = "col3"; + protected static final String F4COL4 = "col4"; private static final byte[][] FAMILIES = new byte[][]{ Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), - Bytes.toBytes(FAMILY3) + Bytes.toBytes(FAMILY3), + Bytes.toBytes(FAMILY4) }; private static final byte[][] SPLIT_KEYS = new byte[][]{Bytes.toBytes(4)}; @@ -107,14 +121,30 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { HTable table = openTable(tableName); List<Put> puts = new ArrayList<>(); // add some data - puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1")); - puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2")); - puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3")); - puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4")); - puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5")); - puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6")); - puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7")); - puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8")); + puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1", + Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), + Time.valueOf("19:00:00"), new BigDecimal(12345678.0001))); + puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2", + Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00"), + new BigDecimal(12345678.0002))); + puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3", + Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00"), + new BigDecimal(12345678.0003))); + puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4", + Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00"), + new BigDecimal(12345678.0004))); + puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5", + Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00"), + new BigDecimal(12345678.0005))); + puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6", + Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00"), + new BigDecimal(12345678.0006))); + puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7", + Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00"), + new BigDecimal(12345678.0007))); + puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8", + Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00"), + new BigDecimal(12345678.0008))); // append rows to table table.put(puts); @@ -139,7 +169,18 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } - private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) { + private static Put putRow( + int rowKey, + int f1c1, + String f2c1, + long f2c2, + double f3c1, + boolean f3c2, + String f3c3, + Timestamp f4c1, + Date f4c2, + Time f4c3, + BigDecimal f4c4) { Put put = new Put(Bytes.toBytes(rowKey)); // family 1 put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1)); @@ -153,6 +194,11 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2)); put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3)); + // family 4 + put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL1), Bytes.toBytes(timestampToInternal(f4c1))); + put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL2), Bytes.toBytes(dateToInternal(f4c2))); + put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL3), Bytes.toBytes(timeToInternal(f4c3))); + put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL4), Bytes.toBytes(f4c4)); return put; } }