Hi, Zhou
> 'connector.write.buffer-flush.max-size' = '1mb', > 'connector.write.buffer-flush.interval' = ‘0s' (1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。 (2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1] Best, Leonard Xu [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674> > 在 2020年7月13日,21:09,Zhou Zach <wander...@163.com> 写道: > > > > flink订阅kafka消息,同时sink到hbase和hive中, > 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条 > > > query: > streamTableEnv.executeSql( > """ > | > |CREATE TABLE hbase_table ( > | rowkey VARCHAR, > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) > |) WITH ( > | 'connector.type' = 'hbase', > | 'connector.version' = '2.1.0', > | 'connector.table-name' = 'ods:user_hbase6', > | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.zookeeper.znode.parent' = '/hbase', > | 'connector.write.buffer-flush.max-size' = '1mb', > | 'connector.write.buffer-flush.max-rows' = '1', > | 'connector.write.buffer-flush.interval' = '0s' > |) > |""".stripMargin) > > val statementSet = streamTableEnv.createStatementSet() > val insertHbase = > """ > |insert into hbase_table > |SELECT > | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), > cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, > | ROW(sex, age, created_time ) as cf > |FROM (select uid,sex,age, cast(created_time as VARCHAR) as > created_time from kafka_table) > | > |""".stripMargin > > statementSet.addInsertSql(insertHbase) > > val insertHive = > """ > | > |INSERT INTO odsCatalog.ods.hive_table > |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > DATE_FORMAT(created_time, 'HH') > |FROM kafka_table > | > |""".stripMargin > statementSet.addInsertSql(insertHive) > > > statementSet.execute() > > > 是因为参数'connector.write.buffer-flush.max-size' = > '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: > Property 'connector.write.buffer-flush.max-size' must be a memory size (in > bytes) value but was: 1kb > Property 'connector.write.buffer-flush.max-size' must be a memory size (in > bytes) value but was: 10b > Property 'connector.write.buffer-flush.max-size' must be a memory size (in > bytes) value but was: 1 > > > > > > > 并且,按照官网文档 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html > > > 设置参数也不识别,报错: > Caused by: org.apache.flink.table.api.ValidationException: Could not find any > factory for identifier 'hbase-2.1.0' that implements > 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. > > > 看了一下源码, > org.apache.flink.table.descriptors.HBaseValidator > public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; > public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; > public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; > public static final String CONNECTOR_ZK_QUORUM = > "connector.zookeeper.quorum"; > public static final String CONNECTOR_ZK_NODE_PARENT = > "connector.zookeeper.znode.parent"; > public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = > "connector.write.buffer-flush.max-size"; > public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = > "connector.write.buffer-flush.max-rows"; > public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = > "connector.write.buffer-flush.interval"; > 参数还是老参数