This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 9e20929  [FLINK-18072][hbase] Fix HBaseLookupFunction can not work 
with new internal data structure RowData
9e20929 is described below

commit 9e20929dbda43959950509e2f4534c285bf48f3b
Author: Leonard Xu <xbjt...@163.com>
AuthorDate: Wed Jun 17 21:14:25 2020 +0800

    [FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal 
data structure RowData
    
    This closes #12691
---
 .../hbase/source/HBaseDynamicTableSource.java      |   2 +-
 .../hbase/source/HBaseLookupFunction.java          |   2 +-
 ...nction.java => HBaseRowDataLookupFunction.java} |  46 +--
 .../flink/connector/hbase/util/HBaseSerde.java     |  26 ++
 .../connector/hbase/HBaseConnectorITCase.java      | 423 ++++++++-------------
 .../hbase/HBaseDynamicTableFactoryTest.java        |   6 +-
 .../flink/connector/hbase/util/HBaseTestBase.java  |  68 +++-
 7 files changed, 278 insertions(+), 295 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index 1dac67ae..537d4b2 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -77,7 +77,7 @@ public class HBaseDynamicTableSource implements 
ScanTableSource, LookupTableSour
                                .isPresent(),
                        "Currently, HBase table only supports lookup by rowkey 
field.");
 
-               return TableFunctionProvider.of(new HBaseLookupFunction(conf, 
tableName, hbaseSchema));
+               return TableFunctionProvider.of(new 
HBaseRowDataLookupFunction(conf, tableName, hbaseSchema, nullStringLiteral));
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
index 1d608e9..c84cbe1 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
@@ -45,7 +45,7 @@ import java.io.IOException;
 
 /**
  * The HBaseLookupFunction is a standard user-defined table function, it can 
be used in tableAPI
- * and also useful for temporal table join plan in SQL.
+ * and also useful for temporal table join plan in SQL. It looks up the result 
as {@link Row}.
  */
 @Internal
 public class HBaseLookupFunction extends TableFunction<Row> {
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
similarity index 81%
copy from 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
copy to 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
index 1d608e9..7facf58 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
@@ -20,13 +20,12 @@ package org.apache.flink.connector.hbase.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
-import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.slf4j.Logger;
@@ -44,29 +44,33 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 
 /**
- * The HBaseLookupFunction is a standard user-defined table function, it can 
be used in tableAPI
- * and also useful for temporal table join plan in SQL.
+ * The HBaseRowDataLookupFunction is a standard user-defined table function, 
it can be used in tableAPI
+ * and also useful for temporal table join plan in SQL. It looks up the result 
as {@link RowData}.
  */
 @Internal
-public class HBaseLookupFunction extends TableFunction<Row> {
-       private static final Logger LOG = 
LoggerFactory.getLogger(HBaseLookupFunction.class);
+public class HBaseRowDataLookupFunction extends TableFunction<RowData> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataLookupFunction.class);
        private static final long serialVersionUID = 1L;
 
        private final String hTableName;
        private final byte[] serializedConfig;
        private final HBaseTableSchema hbaseTableSchema;
+       private final String nullStringLiteral;
 
-       private transient HBaseReadWriteHelper readHelper;
        private transient Connection hConnection;
        private transient HTable table;
+       private transient HBaseSerde serde;
 
-       public HBaseLookupFunction(
+       public HBaseRowDataLookupFunction(
                        Configuration configuration,
                        String hTableName,
-                       HBaseTableSchema hbaseTableSchema) {
+                       HBaseTableSchema hbaseTableSchema,
+                       String nullStringLiteral) {
                this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
                this.hTableName = hTableName;
                this.hbaseTableSchema = hbaseTableSchema;
+               this.nullStringLiteral = nullStringLiteral;
        }
 
        /**
@@ -75,19 +79,17 @@ public class HBaseLookupFunction extends TableFunction<Row> 
{
         */
        public void eval(Object rowKey) throws IOException {
                // fetch result
-               Result result = table.get(readHelper.createGet(rowKey));
-               if (!result.isEmpty()) {
-                       // parse and collect
-                       collect(readHelper.parseToRow(result, rowKey));
+               Get get = serde.createGet(rowKey);
+               if (get != null) {
+                       Result result = table.get(get);
+                       if (!result.isEmpty()) {
+                               // parse and collect
+                               collect(serde.convertToRow(result));
+                       }
                }
        }
 
-       @Override
-       public TypeInformation<Row> getResultType() {
-               return hbaseTableSchema.convertsToTableSchema().toRowType();
-       }
-
-       private org.apache.hadoop.conf.Configuration 
prepareRuntimeConfiguration() {
+       private Configuration prepareRuntimeConfiguration() {
                // create default configuration from current runtime env 
(`hbase-site.xml` in classpath) first,
                // and overwrite configuration using serialized configuration 
from client-side env (`hbase-site.xml` in classpath).
                // user params from client-side have the highest priority
@@ -107,7 +109,7 @@ public class HBaseLookupFunction extends TableFunction<Row> 
{
        @Override
        public void open(FunctionContext context) {
                LOG.info("start open ...");
-               org.apache.hadoop.conf.Configuration config = 
prepareRuntimeConfiguration();
+               Configuration config = prepareRuntimeConfiguration();
                try {
                        hConnection = 
ConnectionFactory.createConnection(config);
                        table = (HTable) 
hConnection.getTable(TableName.valueOf(hTableName));
@@ -118,7 +120,7 @@ public class HBaseLookupFunction extends TableFunction<Row> 
{
                        LOG.error("Exception while creating connection to 
HBase.", ioe);
                        throw new RuntimeException("Cannot create connection to 
HBase.", ioe);
                }
-               this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema);
+               this.serde = new HBaseSerde(hbaseTableSchema, 
nullStringLiteral);
                LOG.info("end open.");
        }
 
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index 693cb62..f888914 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -76,6 +77,7 @@ public class HBaseSerde {
        private final @Nullable FieldDecoder keyDecoder;
        private final FieldEncoder[][] qualifierEncoders;
        private final FieldDecoder[][] qualifierDecoders;
+       private final GenericRowData rowWithRowKey;
 
        public HBaseSerde(HBaseTableSchema hbaseSchema, final String 
nullStringLiteral) {
                this.families = hbaseSchema.getFamilyKeys();
@@ -115,6 +117,7 @@ public class HBaseSerde {
                                .toArray(FieldDecoder[]::new);
                        this.reusedFamilyRows[f] = new 
GenericRowData(dataTypes.length);
                }
+               this.rowWithRowKey = new GenericRowData(1);
        }
 
        /**
@@ -196,6 +199,29 @@ public class HBaseSerde {
        }
 
        /**
+        * Returns an instance of Get that retrieves the matches records from 
the HBase table.
+        *
+        * @return The appropriate instance of Get for this use case.
+        */
+       public Get createGet(Object rowKey) {
+               checkArgument(keyEncoder != null, "row key is not set.");
+               rowWithRowKey.setField(0, rowKey);
+               byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
+               if (rowkey.length == 0) {
+                       // drop dirty records, rowkey shouldn't be zero length
+                       return null;
+               }
+               Get get = new Get(rowkey);
+               for (int f = 0; f < families.length; f++) {
+                       byte[] family = families[f];
+                       for (byte[] qualifier : qualifiers[f]) {
+                               get.addColumn(family, qualifier);
+                       }
+               }
+               return get;
+       }
+
+       /**
         * Converts HBase {@link Result} into {@link RowData}.
         */
        public RowData convertToRow(Result result) {
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 3777dec..9ab36cc 100644
--- 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.connector.hbase.source.HBaseInputFormat;
 import org.apache.flink.connector.hbase.source.HBaseTableSource;
-import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase.util.HBaseTestBase;
 import org.apache.flink.connector.hbase.util.PlannerType;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -40,20 +39,17 @@ import 
org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.TableImpl;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
 import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
 import org.apache.flink.table.planner.sinks.CollectRowTableSink;
 import org.apache.flink.table.planner.sinks.CollectTableSink;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -61,26 +57,15 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
 
 import scala.Option;
 
 import static org.apache.flink.connector.hbase.util.PlannerType.OLD_PLANNER;
 import static org.apache.flink.table.api.Expressions.$;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -124,7 +109,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                        hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                        hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                        hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-                       hbaseTable.setRowKey("rowkey", Integer.class);
+                       hbaseTable.setRowKey(ROW_KEY, Integer.class);
                        ((TableEnvironmentInternal) 
tEnv).registerTableSourceInternal("hTable", hbaseTable);
                } else {
                        tEnv.executeSql(
@@ -176,7 +161,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                        hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                        hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                        hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-                       hbaseTable.setRowKey("rowkey", Integer.class);
+                       hbaseTable.setRowKey(ROW_KEY, Integer.class);
                        ((TableEnvironmentInternal) 
tEnv).registerTableSourceInternal("hTable", hbaseTable);
                } else {
                        tEnv.executeSql(
@@ -221,7 +206,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                if (isLegacyConnector) {
                        HBaseTableSource hbaseTable = new 
HBaseTableSource(getConf(), TEST_TABLE_1);
                        // shuffle order of column registration
-                       hbaseTable.setRowKey("rowkey", Integer.class);
+                       hbaseTable.setRowKey(ROW_KEY, Integer.class);
                        hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
                        hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                        hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
@@ -268,7 +253,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                        HBaseTableSource hbaseTable = new 
HBaseTableSource(getConf(), TEST_TABLE_1);
                        hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
                        hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
-                       hbaseTable.setRowKey("rowkey", Integer.class);
+                       hbaseTable.setRowKey(ROW_KEY, Integer.class);
                        ((TableEnvironmentInternal) 
tEnv).registerTableSourceInternal("hTable", hbaseTable);
                } else {
                        tEnv.executeSql(
@@ -319,115 +304,33 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                assertEquals(360, (int) resultSet.get(0).f0);
        }
 
-       // 
-------------------------------------------------------------------------------------
-       // HBaseTableSink tests
-       // 
-------------------------------------------------------------------------------------
-
-       // prepare a source collection.
-       private static final List<Row> testData1 = new ArrayList<>();
-       private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
-               new TypeInformation[]{Types.INT, Types.INT, Types.STRING, 
Types.LONG, Types.DOUBLE,
-                       Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, 
Types.SQL_DATE, Types.SQL_TIME},
-               new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", 
"f3c3", "f4c1", "f4c2", "f4c3"});
-
-       static {
-               testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, 
"Welt-1",
-                       Timestamp.valueOf("2019-08-18 19:00:00"), 
Date.valueOf("2019-08-18"), Time.valueOf("19:00:00")));
-               testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, 
"Welt-2",
-                       Timestamp.valueOf("2019-08-18 19:01:00"), 
Date.valueOf("2019-08-18"), Time.valueOf("19:01:00")));
-               testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, 
"Welt-3",
-                       Timestamp.valueOf("2019-08-18 19:02:00"), 
Date.valueOf("2019-08-18"), Time.valueOf("19:02:00")));
-               testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4",
-                       Timestamp.valueOf("2019-08-18 19:03:00"), 
Date.valueOf("2019-08-18"), Time.valueOf("19:03:00")));
-               testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, 
"Welt-5",
-                       Timestamp.valueOf("2019-08-19 19:10:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:10:00")));
-               testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, 
"Welt-6",
-                       Timestamp.valueOf("2019-08-19 19:20:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:20:00")));
-               testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, 
"Welt-7",
-                       Timestamp.valueOf("2019-08-19 19:30:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:30:00")));
-               testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8",
-                       Timestamp.valueOf("2019-08-19 19:40:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:40:00")));
-       }
-
        @Test
        public void testTableSink() throws Exception {
                StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(execEnv, streamSettings);
 
-               if (isLegacyConnector) {
-                       HBaseTableSchema schema = new HBaseTableSchema();
-                       schema.addColumn(FAMILY1, F1COL1, Integer.class);
-                       schema.addColumn(FAMILY2, F2COL1, String.class);
-                       schema.addColumn(FAMILY2, F2COL2, Long.class);
-                       schema.setRowKey("rk", Integer.class);
-                       schema.addColumn(FAMILY3, F3COL1, Double.class);
-                       schema.addColumn(FAMILY3, F3COL2, Boolean.class);
-                       schema.addColumn(FAMILY3, F3COL3, String.class);
-
-                       Map<String, String> tableProperties = new HashMap<>();
-                       tableProperties.put("connector.type", "hbase");
-                       tableProperties.put("connector.version", "1.4.3");
-                       tableProperties.put("connector.property-version", "1");
-                       tableProperties.put("connector.table-name", 
TEST_TABLE_2);
-                       tableProperties.put("connector.zookeeper.quorum", 
getZookeeperQuorum());
-                       tableProperties.put("connector.zookeeper.znode.parent", 
"/hbase");
-                       DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-                       descriptorProperties.putTableSchema(SCHEMA, 
schema.convertsToTableSchema());
-                       descriptorProperties.putProperties(tableProperties);
-                       TableSink tableSink = TableFactoryService
-                               .find(HBaseTableFactory.class, 
descriptorProperties.asMap())
-                               .createTableSink(descriptorProperties.asMap());
-                       ((TableEnvironmentInternal) 
tEnv).registerTableSinkInternal("hbase", tableSink);
-               } else {
-                       tEnv.executeSql(
-                                       "CREATE TABLE hbase (" +
-                                       " family1 ROW<col1 INT>," +
-                                       " family2 ROW<col1 STRING, col2 
BIGINT>," +
-                                       " rk INT," +
-                                       " family3 ROW<col1 DOUBLE, col2 
BOOLEAN, col3 STRING>" +
-                                       ") WITH (" +
-                                       " 'connector' = 'hbase-1.4'," +
-                                       " 'table-name' = '" + TEST_TABLE_1 + 
"'," +
-                                       " 'zookeeper.quorum' = '" + 
getZookeeperQuorum() + "'," +
-                                       " 'zookeeper.znode.parent' = '/hbase'" +
-                                       ")");
-               }
+               // register HBase table testTable1 which contains test data
+               String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
+               tEnv.executeSql(table1DDL);
+
+               String table2DDL = createHBaseTableDDL(TEST_TABLE_2, false);
+               tEnv.executeSql(table2DDL);
 
-               DataStream<Row> ds = 
execEnv.fromCollection(testData1).returns(testTypeInfo1);
-               tEnv.createTemporaryView("src", ds);
+               String query = "INSERT INTO " + TEST_TABLE_2 + " SELECT" +
+                       " rowkey," +
+                       " family1," +
+                       " family2," +
+                       " family3" +
+                       " FROM " + TEST_TABLE_1;
 
-               String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, 
f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+               // wait to finish
                TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query);
 
                // start a batch scan job to verify contents in HBase table
-               // start a batch scan job to verify contents in HBase table
-               TableEnvironment batchTableEnv = createBatchTableEnv();
-
-               if (isLegacyConnector) {
-                       HBaseTableSource hbaseTable = new 
HBaseTableSource(getConf(), TEST_TABLE_2);
-                       hbaseTable.setRowKey("rowkey", Integer.class);
-                       hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
-                       hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
-                       hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
-                       hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
-                       hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
-                       hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-                       ((TableEnvironmentInternal) 
batchTableEnv).registerTableSourceInternal("hTable", hbaseTable);
-               } else {
-                       batchTableEnv.executeSql(
-                                       "CREATE TABLE hTable (" +
-                                       " rowkey INT," +
-                                       " family1 ROW<col1 INT>," +
-                                       " family2 ROW<col1 STRING, col2 
BIGINT>," +
-                                       " family3 ROW<col1 DOUBLE, col2 
BOOLEAN, col3 STRING>" +
-                                       ") WITH (" +
-                                       " 'connector' = 'hbase-1.4'," +
-                                       " 'table-name' = '" + TEST_TABLE_1 + 
"'," +
-                                       " 'zookeeper.quorum' = '" + 
getZookeeperQuorum() + "'" +
-                                       ")");
-               }
+               TableEnvironment batchEnv = createBatchTableEnv();
+               batchEnv.executeSql(table2DDL);
 
-               Table table = batchTableEnv.sqlQuery(
+               Table table = batchEnv.sqlQuery(
                        "SELECT " +
                                "  h.rowkey, " +
                                "  h.family1.col1, " +
@@ -436,9 +339,8 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                                "  h.family3.col1, " +
                                "  h.family3.col2, " +
                                "  h.family3.col3 " +
-                               "FROM hTable AS h"
+                               "FROM " + TEST_TABLE_2 + " AS h"
                );
-
                List<Row> results = collectBatchResult(table);
                String expected =
                                "1,10,Hello-1,100,1.01,false,Welt-1\n" +
@@ -455,56 +357,37 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 
        @Test
        public void testTableSourceSinkWithDDL() throws Exception {
+               if (OLD_PLANNER.equals(planner) || isLegacyConnector) {
+                       // only test for blink planner and new connector, 
because types TIMESTAMP/DATE/TIME/DECIMAL works well in
+                       // new connector(using blink-planner), but exits some 
precision problem in old planner or legacy connector.
+                       return;
+               }
+
                StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(execEnv, streamSettings);
 
-               DataStream<Row> ds = 
execEnv.fromCollection(testData1).returns(testTypeInfo1);
-               tEnv.createTemporaryView("src", ds);
+               // regiter HBase table testTable1 which contains test data
+               String table1DDL = createHBaseTableDDL(TEST_TABLE_1, true);
+               tEnv.executeSql(table1DDL);
 
-               // register hbase table
-               String quorum = getZookeeperQuorum();
-               String ddl;
-               if (isLegacyConnector) {
-                       ddl = "CREATE TABLE hbase (\n" +
-                               "    rowkey INT," +
-                               "    family1 ROW<col1 INT>,\n" +
-                               "    family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" 
+
-                               "    family3 ROW<col1 DOUBLE, col2 BOOLEAN, 
col3 VARCHAR>,\n" +
-                               "    family4 ROW<col1 TIMESTAMP(3), col2 DATE, 
col3 TIME(3)>\n" +
-                               ") WITH (\n" +
-                               "    'connector.type' = 'hbase',\n" +
-                               "    'connector.version' = '1.4.3',\n" +
-                               "    'connector.table-name' = 'testTable3',\n" +
-                               "    'connector.zookeeper.quorum' = '" + quorum 
+ "',\n" +
-                               "    'connector.zookeeper.znode.parent' = 
'/hbase' " +
-                               ")";
-               } else {
-                       ddl = "CREATE TABLE hbase (\n" +
-                               "    rowkey INT," +
-                               "    family1 ROW<col1 INT>,\n" +
-                               "    family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" 
+
-                               "    family3 ROW<col1 DOUBLE, col2 BOOLEAN, 
col3 VARCHAR>,\n" +
-                               "    family4 ROW<col1 TIMESTAMP(3), col2 DATE, 
col3 TIME(3)>\n" +
-                               ") WITH (\n" +
-                               "    'connector' = 'hbase-1.4',\n" +
-                               "    'table-name' = 'testTable3',\n" +
-                               "    'zookeeper.quorum' = '" + quorum + "',\n" +
-                               "    'zookeeper.znode.parent' = '/hbase' " +
-                               ")";
-               }
-               tEnv.executeSql(ddl);
+               // register HBase table which is empty
+               String table3DDL = createHBaseTableDDL(TEST_TABLE_3, true);
+               tEnv.executeSql(table3DDL);
 
-               String query = "INSERT INTO hbase " +
-                       "SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, 
f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " +
-                       "FROM src";
-               TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query);
+               String insertStatement = "INSERT INTO " + TEST_TABLE_3 +
+                       " SELECT rowkey," +
+                       " family1," +
+                       " family2," +
+                       " family3," +
+                       " family4" +
+                       " from " + TEST_TABLE_1;
+               // wait to finish
+               TableEnvUtil.execInsertSqlAndWaitResult(tEnv, insertStatement);
 
                // start a batch scan job to verify contents in HBase table
-               TableEnvironment batchTableEnv = createBatchTableEnv();
-               batchTableEnv.executeSql(ddl);
-
-               Table table = batchTableEnv.sqlQuery(
-                       "SELECT " +
+               TableEnvironment batchEnv = createBatchTableEnv();
+               batchEnv.executeSql(table3DDL);
+               String query = "SELECT " +
                                "  h.rowkey, " +
                                "  h.family1.col1, " +
                                "  h.family2.col1, " +
@@ -514,115 +397,103 @@ public class HBaseConnectorITCase extends HBaseTestBase 
{
                                "  h.family3.col3, " +
                                "  h.family4.col1, " +
                                "  h.family4.col2, " +
-                               "  h.family4.col3 " +
-                               "FROM hbase AS h"
-               );
-
-               List<Row> results = collectBatchResult(table);
-               String expected =
-                               "1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 
19:00:00.0,2019-08-18,19:00:00\n" +
-                               "2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 
19:01:00.0,2019-08-18,19:01:00\n" +
-                               "3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 
19:02:00.0,2019-08-18,19:02:00\n" +
-                               "4,40,null,400,4.04,true,Welt-4,2019-08-18 
19:03:00.0,2019-08-18,19:03:00\n" +
-                               "5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 
19:10:00.0,2019-08-19,19:10:00\n" +
-                               "6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 
19:20:00.0,2019-08-19,19:20:00\n" +
-                               "7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 
19:30:00.0,2019-08-19,19:30:00\n" +
-                               "8,80,null,800,8.08,true,Welt-8,2019-08-19 
19:40:00.0,2019-08-19,19:40:00\n";
-
-               TestBaseUtils.compareResultAsText(results, expected);
-       }
-
+                               "  h.family4.col3, " +
+                               "  h.family4.col4 " +
+                               " FROM " + TEST_TABLE_3 + " AS h";
+               Iterator<Row> collected = tEnv.executeSql(query).collect();
+               List<String> result = Lists.newArrayList(collected).stream()
+                       .map(Row::toString)
+                       .sorted()
+                       .collect(Collectors.toList());
 
-       // 
-------------------------------------------------------------------------------------
-       // HBase lookup source tests
-       // 
-------------------------------------------------------------------------------------
-
-       // prepare a source collection.
-       private static final List<Row> testData2 = new ArrayList<>();
-       private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo(
-               new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
-               new String[]{"a", "b", "c"});
-
-       static {
-               testData2.add(Row.of(1, 1L, "Hi"));
-               testData2.add(Row.of(2, 2L, "Hello"));
-               testData2.add(Row.of(3, 2L, "Hello world"));
-               testData2.add(Row.of(3, 3L, "Hello world!"));
+               List<String> expected = new ArrayList<>();
+               
expected.add("1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
+               
expected.add("2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
+               
expected.add("3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
+               
expected.add("4,40,null,400,4.04,true,Welt-4,2019-08-18T19:03,2019-08-18,19:03,12345678.0004");
+               
expected.add("5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19T19:10,2019-08-19,19:10,12345678.0005");
+               
expected.add("6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19T19:20,2019-08-19,19:20,12345678.0006");
+               
expected.add("7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19T19:30,2019-08-19,19:30,12345678.0007");
+               
expected.add("8,80,null,800,8.08,true,Welt-8,2019-08-19T19:40,2019-08-19,19:40,12345678.0008");
+               assertEquals(expected, result);
        }
 
        @Test
        public void testHBaseLookupTableSource() throws Exception {
-               if (OLD_PLANNER.equals(planner)) {
+               if (OLD_PLANNER.equals(planner) || isLegacyConnector) {
                        // lookup table source is only supported in blink 
planner, skip for old planner
+                       // types TIMESTAMP/DATE/TIME/DECIMAL works well in new 
connector, skip legacy connector
                        return;
                }
-               StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamTableEnvironment streamTableEnv = 
StreamTableEnvironment.create(streamEnv, streamSettings);
-               StreamITCase.clear();
+
+               StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(execEnv, streamSettings);
+
+               tEnv.executeSql(
+                       "CREATE TABLE " + TEST_TABLE_1 + " (" +
+                               " family1 ROW<col1 INT>," +
+                               " family2 ROW<col1 STRING, col2 BIGINT>," +
+                               " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 
STRING>," +
+                               " rowkey INT," +
+                               " family4 ROW<col1 TIMESTAMP(3), col2 DATE, 
col3 TIME(3), col4 DECIMAL(12, 4)>," +
+                               " PRIMARY KEY (rowkey) NOT ENFORCED" +
+                               ") WITH (" +
+                               " 'connector' = 'hbase-1.4'," +
+                               " 'table-name' = '" + TEST_TABLE_1 + "'," +
+                               " 'zookeeper.quorum' = '" + 
getZookeeperQuorum() + "'" +
+                               ")");
 
                // prepare a source table
                String srcTableName = "src";
-               DataStream<Row> ds = 
streamEnv.fromCollection(testData2).returns(testTypeInfo2);
-               Table in = streamTableEnv.fromDataStream(ds, $("a"), $("b"), 
$("c"), $("proc").proctime());
-               streamTableEnv.registerTable(srcTableName, in);
+               DataStream<Row> srcDs = 
execEnv.fromCollection(testData).returns(testTypeInfo);
+               Table in = tEnv.fromDataStream(srcDs, $("a"), $("b"), $("c"), 
$("proc").proctime());
+               tEnv.registerTable(srcTableName, in);
 
-               if (isLegacyConnector) {
-                       Map<String, String> tableProperties = 
hbaseTableProperties();
-                       TableSource<?> source = TableFactoryService
-                               .find(HBaseTableFactory.class, tableProperties)
-                               .createTableSource(tableProperties);
-                       ((TableEnvironmentInternal) 
streamTableEnv).registerTableSourceInternal("hbaseLookup", source);
-               } else {
-                       streamTableEnv.executeSql(
-                                       "CREATE TABLE hbaseLookup (" +
-                                       " family1 ROW<col1 INT>," +
-                                       " rk INT," +
-                                       " family2 ROW<col1 STRING, col2 
BIGINT>," +
-                                       " family3 ROW<col1 DOUBLE, col2 
BOOLEAN, col3 STRING>" +
-                                       ") WITH (" +
-                                       " 'connector' = 'hbase-1.4'," +
-                                       " 'table-name' = '" + TEST_TABLE_1 + 
"'," +
-                                       " 'zookeeper.quorum' = '" + 
getZookeeperQuorum() + "'" +
-                                       ")");
-               }
                // perform a temporal table join query
-               String query = "SELECT a,family1.col1, family3.col3 FROM src " +
-                       "JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h 
ON src.a = h.rk";
-               Table result = streamTableEnv.sqlQuery(query);
-
-               DataStream<Row> resultSet = 
streamTableEnv.toAppendStream(result, Row.class);
-               resultSet.addSink(new StreamITCase.StringSink<>());
-
-               streamEnv.execute();
+               String dimJoinQuery = "SELECT" +
+                       " a," +
+                       " b," +
+                       " h.family1.col1," +
+                       " h.family2.col1," +
+                       " h.family2.col2," +
+                       " h.family3.col1," +
+                       " h.family3.col2," +
+                       " h.family3.col3," +
+                       " h.family4.col1," +
+                       " h.family4.col2," +
+                       " h.family4.col3," +
+                       " h.family4.col4 " +
+                       " FROM src JOIN " + TEST_TABLE_1 + " FOR SYSTEM_TIME AS 
OF src.proc as h ON src.a = h.rowkey";
+               Iterator<Row> collected = 
tEnv.executeSql(dimJoinQuery).collect();
+               List<String> result = Lists.newArrayList(collected).stream()
+                       .map(Row::toString)
+                       .sorted()
+                       .collect(Collectors.toList());
 
                List<String> expected = new ArrayList<>();
-               expected.add("1,10,Welt-1");
-               expected.add("2,20,Welt-2");
-               expected.add("3,30,Welt-3");
-               expected.add("3,30,Welt-3");
+               
expected.add("1,1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
+               
expected.add("2,2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
+               
expected.add("3,2,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
+               
expected.add("3,3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
 
-               StreamITCase.compareWithList(expected);
+               assertEquals(expected, result);
        }
 
-       private static Map<String, String> hbaseTableProperties() {
-               Map<String, String> properties = new HashMap<>();
-               properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
-               properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
-               properties.put(CONNECTOR_PROPERTY_VERSION, "1");
-               properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
-               properties.put(CONNECTOR_ZK_QUORUM, getZookeeperQuorum());
-               // schema
-               String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
-               TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, 
Types.INT);
-               TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, 
F2COL2}, Types.STRING, Types.LONG);
-               TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, 
F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
-               TypeInformation[] columnTypes = new TypeInformation[]{f1, 
Types.INT, f2, f3};
-
-               DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-               TableSchema tableSchema = new TableSchema(columnNames, 
columnTypes);
-               descriptorProperties.putTableSchema(SCHEMA, tableSchema);
-               descriptorProperties.putProperties(properties);
-               return descriptorProperties.asMap();
+       // 
-------------------------------------------------------------------------------------
+       // HBase lookup source tests
+       // 
-------------------------------------------------------------------------------------
+
+       // prepare a source collection.
+       private static final List<Row> testData = new ArrayList<>();
+       private static final RowTypeInfo testTypeInfo = new RowTypeInfo(
+               new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
+               new String[]{"a", "b", "c"});
+
+       static {
+               testData.add(Row.of(1, 1L, "Hi"));
+               testData.add(Row.of(2, 2L, "Hello"));
+               testData.add(Row.of(3, 2L, "Hello world"));
+               testData.add(Row.of(3, 3L, "Hello world!"));
        }
 
        // ------------------------------- Utilities 
-------------------------------------------------
@@ -721,4 +592,42 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                }
        }
 
+       private String createHBaseTableDDL(String tableName, boolean 
testTimeAndDecimalTypes) {
+               StringBuilder family4Statement = new StringBuilder();
+               if (testTimeAndDecimalTypes) {
+                       family4Statement.append(", family4 ROW<col1 
TIMESTAMP(3)");
+                       family4Statement.append(", col2 DATE");
+                       family4Statement.append(", col3 TIME(3)");
+                       family4Statement.append(", col4 DECIMAL(12, 4)");
+                       family4Statement.append("> \n");
+               }
+               if (isLegacyConnector) {
+                       return "CREATE TABLE " + tableName + "(\n" +
+                               "       rowkey INT,\n" +
+                               "   family1 ROW<col1 INT>,\n" +
+                               "   family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+                               "   family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 
VARCHAR>" +
+                               family4Statement.toString() +
+                               ") WITH (\n" +
+                               "   'connector.type' = 'hbase',\n" +
+                               "   'connector.version' = '1.4.3',\n" +
+                               "   'connector.table-name' = '" + tableName + 
"',\n" +
+                               "   'connector.zookeeper.quorum' = '" + 
getZookeeperQuorum() + "',\n" +
+                               "   'connector.zookeeper.znode.parent' = 
'/hbase' " +
+                               ")";
+               } else {
+                       return "CREATE TABLE " + tableName + "(\n" +
+                               "   rowkey INT," +
+                               "   family1 ROW<col1 INT>,\n" +
+                               "   family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+                               "   family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 
VARCHAR>" +
+                               family4Statement.toString() +
+                               ") WITH (\n" +
+                               "   'connector' = 'hbase-1.4',\n" +
+                               "   'table-name' = '" + tableName + "',\n" +
+                               "   'zookeeper.quorum' = '" + 
getZookeeperQuorum() + "',\n" +
+                               "   'zookeeper.znode.parent' = '/hbase' " +
+                               ")";
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index 061114e..47775ac 100644
--- 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase.source.HBaseDynamicTableSource;
-import org.apache.flink.connector.hbase.source.HBaseLookupFunction;
+import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -111,8 +111,8 @@ public class HBaseDynamicTableFactoryTest {
                assertTrue(lookupProvider instanceof TableFunctionProvider);
 
                TableFunction tableFunction = ((TableFunctionProvider) 
lookupProvider).createTableFunction();
-               assertTrue(tableFunction instanceof HBaseLookupFunction);
-               assertEquals("testHBastTable", ((HBaseLookupFunction) 
tableFunction).getHTableName());
+               assertTrue(tableFunction instanceof HBaseRowDataLookupFunction);
+               assertEquals("testHBastTable", ((HBaseRowDataLookupFunction) 
tableFunction).getHTableName());
 
                HBaseTableSchema hbaseSchema = 
hbaseSource.getHBaseTableSchema();
                assertEquals(2, hbaseSchema.getRowKeyIndex());
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
index e00b986..c376b95 100644
--- 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
@@ -28,9 +28,17 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
+import static 
org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timeToInternal;
+import static 
org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToInternal;
+
 /**
  * Abstract IT case class for HBase.
  */
@@ -40,7 +48,8 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
        protected static final String TEST_TABLE_2 = "testTable2";
        protected static final String TEST_TABLE_3 = "testTable3";
 
-       protected static final String ROWKEY = "rk";
+       protected static final String ROW_KEY = "rowkey";
+
        protected static final String FAMILY1 = "family1";
        protected static final String F1COL1 = "col1";
 
@@ -54,11 +63,16 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
        protected static final String F3COL3 = "col3";
 
        protected static final String FAMILY4 = "family4";
+       protected static final String F4COL1 = "col1";
+       protected static final String F4COL2 = "col2";
+       protected static final String F4COL3 = "col3";
+       protected static final String F4COL4 = "col4";
 
        private static final byte[][] FAMILIES = new byte[][]{
                Bytes.toBytes(FAMILY1),
                Bytes.toBytes(FAMILY2),
-               Bytes.toBytes(FAMILY3)
+               Bytes.toBytes(FAMILY3),
+               Bytes.toBytes(FAMILY4)
        };
 
        private static final byte[][] SPLIT_KEYS = new 
byte[][]{Bytes.toBytes(4)};
@@ -107,14 +121,30 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
                HTable table = openTable(tableName);
                List<Put> puts = new ArrayList<>();
                // add some data
-               puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-               puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-               puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-               puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
-               puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-               puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-               puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-               puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+               puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
+                       Timestamp.valueOf("2019-08-18 19:00:00"), 
Date.valueOf("2019-08-18"),
+                       Time.valueOf("19:00:00"), new 
BigDecimal(12345678.0001)));
+               puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
+                       Timestamp.valueOf("2019-08-18 19:01:00"), 
Date.valueOf("2019-08-18"), Time.valueOf("19:01:00"),
+                       new BigDecimal(12345678.0002)));
+               puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
+                       Timestamp.valueOf("2019-08-18 19:02:00"), 
Date.valueOf("2019-08-18"), Time.valueOf("19:02:00"),
+                       new BigDecimal(12345678.0003)));
+               puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4",
+                       Timestamp.valueOf("2019-08-18 19:03:00"), 
Date.valueOf("2019-08-18"), Time.valueOf("19:03:00"),
+                       new BigDecimal(12345678.0004)));
+               puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
+                       Timestamp.valueOf("2019-08-19 19:10:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:10:00"),
+                       new BigDecimal(12345678.0005)));
+               puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
+                       Timestamp.valueOf("2019-08-19 19:20:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:20:00"),
+                       new BigDecimal(12345678.0006)));
+               puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
+                       Timestamp.valueOf("2019-08-19 19:30:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:30:00"),
+                       new BigDecimal(12345678.0007)));
+               puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8",
+                       Timestamp.valueOf("2019-08-19 19:40:00"), 
Date.valueOf("2019-08-19"), Time.valueOf("19:40:00"),
+                       new BigDecimal(12345678.0008)));
 
                // append rows to table
                table.put(puts);
@@ -139,7 +169,18 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
                createTable(tableName, families, SPLIT_KEYS);
        }
 
-       private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, 
double f3c1, boolean f3c2, String f3c3) {
+       private static Put putRow(
+                       int rowKey,
+                       int f1c1,
+                       String f2c1,
+                       long f2c2,
+                       double f3c1,
+                       boolean f3c2,
+                       String f3c3,
+                       Timestamp f4c1,
+                       Date f4c2,
+                       Time f4c3,
+                       BigDecimal f4c4) {
                Put put = new Put(Bytes.toBytes(rowKey));
                // family 1
                put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), 
Bytes.toBytes(f1c1));
@@ -153,6 +194,11 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
                put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), 
Bytes.toBytes(f3c2));
                put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), 
Bytes.toBytes(f3c3));
 
+               // family 4
+               put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL1), 
Bytes.toBytes(timestampToInternal(f4c1)));
+               put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL2), 
Bytes.toBytes(dateToInternal(f4c2)));
+               put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL3), 
Bytes.toBytes(timeToInternal(f4c3)));
+               put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL4), 
Bytes.toBytes(f4c4));
                return put;
        }
 }

Reply via email to