Hi, Zhou

>       'connector.write.buffer-flush.max-size' = '1mb',
>       'connector.write.buffer-flush.interval' = ‘0s'

(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 
BufferredMutator 
做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval
 设置为 0s 
时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval
 设置成 1s 应该就能看到数据了。

(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 
所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 
1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]


Best,
Leonard Xu
[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674
 
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674>


> 在 2020年7月13日,21:09,Zhou Zach <wander...@163.com> 写道:
> 
> 
> 
> flink订阅kafka消息,同时sink到hbase和hive中,
> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
> 
> 
> query:
> streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE hbase_table (
>        |    rowkey VARCHAR,
>        |    cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>        |) WITH (
>        |    'connector.type' = 'hbase',
>        |    'connector.version' = '2.1.0',
>        |    'connector.table-name' = 'ods:user_hbase6',
>        |    'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>        |    'connector.zookeeper.znode.parent' = '/hbase',
>        |    'connector.write.buffer-flush.max-size' = '1mb',
>        |    'connector.write.buffer-flush.max-rows' = '1',
>        |    'connector.write.buffer-flush.interval' = '0s'
>        |)
>        |""".stripMargin)
> 
>    val statementSet = streamTableEnv.createStatementSet()
>    val insertHbase =
>      """
>        |insert into hbase_table
>        |SELECT
>        |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>        |   ROW(sex, age, created_time ) as cf
>        |FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
> created_time from kafka_table)
>        |
>        |""".stripMargin
> 
>    statementSet.addInsertSql(insertHbase)
> 
>    val insertHive =
>      """
>        |
>        |INSERT INTO odsCatalog.ods.hive_table
>        |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), 
> DATE_FORMAT(created_time, 'HH')
>        |FROM kafka_table
>        |
>        |""".stripMargin
>    statementSet.addInsertSql(insertHive)
> 
> 
>    statementSet.execute()
> 
> 
> 是因为参数'connector.write.buffer-flush.max-size' = 
> '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 1kb
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 10b
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 1
> 
> 
> 
> 
> 
> 
> 并且,按照官网文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
> 
> 
> 设置参数也不识别,报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
> factory for identifier 'hbase-2.1.0' that implements 
> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
> 
> 
> 看了一下源码,
> org.apache.flink.table.descriptors.HBaseValidator
> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>    public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>    public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>    public static final String CONNECTOR_ZK_QUORUM = 
> "connector.zookeeper.quorum";
>    public static final String CONNECTOR_ZK_NODE_PARENT = 
> "connector.zookeeper.znode.parent";
>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
> "connector.write.buffer-flush.max-size";
>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
> "connector.write.buffer-flush.max-rows";
>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
> "connector.write.buffer-flush.interval";
> 参数还是老参数

回复