flink订阅kafka消息,同时sink到hbase和hive中,
当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条


query:
streamTableEnv.executeSql(
      """
        |
        |CREATE TABLE hbase_table (
        |    rowkey VARCHAR,
        |    cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
        |) WITH (
        |    'connector.type' = 'hbase',
        |    'connector.version' = '2.1.0',
        |    'connector.table-name' = 'ods:user_hbase6',
        |    'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
        |    'connector.zookeeper.znode.parent' = '/hbase',
        |    'connector.write.buffer-flush.max-size' = '1mb',
        |    'connector.write.buffer-flush.max-rows' = '1',
        |    'connector.write.buffer-flush.interval' = '0s'
        |)
        |""".stripMargin)

    val statementSet = streamTableEnv.createStatementSet()
    val insertHbase =
      """
        |insert into hbase_table
        |SELECT
        |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
        |   ROW(sex, age, created_time ) as cf
        |FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
created_time from kafka_table)
        |
        |""".stripMargin

    statementSet.addInsertSql(insertHbase)

    val insertHive =
      """
        |
        |INSERT INTO odsCatalog.ods.hive_table
        |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), 
DATE_FORMAT(created_time, 'HH')
        |FROM kafka_table
        |
        |""".stripMargin
    statementSet.addInsertSql(insertHive)


    statementSet.execute()


是因为参数'connector.write.buffer-flush.max-size' = 
'1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 1kb
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 10b
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 1






并且,按照官网文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


设置参数也不识别,报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'hbase-2.1.0' that implements 
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.


看了一下源码,
org.apache.flink.table.descriptors.HBaseValidator
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
    public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
    public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
    public static final String CONNECTOR_ZK_QUORUM = 
"connector.zookeeper.quorum";
    public static final String CONNECTOR_ZK_NODE_PARENT = 
"connector.zookeeper.znode.parent";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
"connector.write.buffer-flush.max-size";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
"connector.write.buffer-flush.max-rows";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
"connector.write.buffer-flush.interval";
参数还是老参数

回复