flink checkpoint 延迟的性能问题讨论
各位大佬, 背景: 实际测试flink读Kafka 数据写入hudi, checkpoint的间隔时间是1min, state.backend分别为filesystem,测试结果如下: 写hudi的checkpoint 的延迟 写iceberg得延迟: 疑问: hudi的checkpoint的文件数据比iceberg要大很多,如何降低flink写hudi的checkpoint的延迟? | | 博星 | | 15868861...@163.com |
回复: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列
参考这个案例试试: CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, `proc_time` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_dim ( rowkey INT, family1 ROW, family2 ROW, family3 ROW ) WITH ( 'connector'='cloudhbase', 'table-name'='', 'zookeeper.quorum'='' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, f1c1 INT, f3c3 STRING ) WITH ( 'connector'='blackhole' ); INSERTINTO blackhole_sink SELECT a, family1.col1 as f1c1, family3.col3 as f3c3 FROM datagen_source JOIN hbase_dim FORSYSTEM_TIMEASOF datagen_source.`proc_time` as h ON datagen_source.a = h.rowkey; | | 博星 | | 15868861...@163.com | 回复的原邮件 | 发件人 | xiaohui zhang | | 发送日期 | 2024年06月20日 10:03 | | 收件人 | | | 主题 | Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列 | flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。 如果你确定只需写入部分数据,在DDL中只定义你用到的部分 zboyu0104 于2024年6月14日周五 15:43写道: 怎么退订 from 阿里邮箱 iPhone-- 发件人:谢县东 日 期:2024年06月06日 16:07:05 收件人: 主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列 各位好: flink版本: 1.13.6 我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下: CREATE TABLE hbase_test_db_test_table_xxd ( rowkey STRING, cf1 ROW, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'test_db:test_table_t1', 'zookeeper.quorum' = 'xxx:2181', 'zookeeper.znode.parent' = '/hbase', 'null-string-literal' = '', 'sink.parallelism' = '2' ); hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列) INSERT INTO hbase_test_db_test_table_xxd select '002' as rowkey, row('xxd_2', 'boy', '10') as cf1; 如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?
回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串 | | 博星 | | 15868861...@163.com | 回复的原邮件 | 发件人 | ha.fen...@aisino.com | | 发送日期 | 2024年06月25日 15:54 | | 收件人 | user-zh | | 主题 | cdc读取oracle数据如何解析 | 根据文档的代码 JdbcIncrementalSource oracleChangeEventSource = new OracleSourceBuilder() .hostname("host") .port(1521) .databaseList("ORCLCDB") .schemaList("DEBEZIUM") .tableList("DEBEZIUM.PRODUCTS") .username("username") .password("password") .deserializer(new JsonDebeziumDeserializationSchema()) .includeSchemaChanges(true) // output the schema changes as well .startupOptions(StartupOptions.initial()) .debeziumProperties(debeziumProperties) .splitSize(2) .build(); 返回的结果: {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null} 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
回复: cdc读取oracle数据如何解析
你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的 | | 博星 | | 15868861...@163.com | 回复的原邮件 | 发件人 | Yanquan Lv | | 发送日期 | 2024年06月26日 14:46 | | 收件人 | | | 主题 | Re: 回复:cdc读取oracle数据如何解析 | 你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理 可以通过添加下面代码来让展示信息更直观。 Map customConverterConfigs = new HashMap<>(); customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(includeSchema, customConverterConfigs); ha.fen...@aisino.com 于2024年6月25日周二 17:26写道: 数据没问题 "ID" "NAME" "ADDTIME""PRICE" 1 "aa" 2024-6-25 14:21:33 12.22 发件人: 15868861416 发送时间: 2024-06-25 17:19 收件人: user-zh@flink.apache.org 主题: 回复:cdc读取oracle数据如何解析 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串 | | 博星 | | 15868861...@163.com | 回复的原邮件 | 发件人 | ha.fen...@aisino.com | | 发送日期 | 2024年06月25日 15:54 | | 收件人 | user-zh | | 主题 | cdc读取oracle数据如何解析 | 根据文档的代码 JdbcIncrementalSource oracleChangeEventSource = new OracleSourceBuilder() .hostname("host") .port(1521) .databaseList("ORCLCDB") .schemaList("DEBEZIUM") .tableList("DEBEZIUM.PRODUCTS") .username("username") .password("password") .deserializer(new JsonDebeziumDeserializationSchema()) .includeSchemaChanges(true) // output the schema changes as well .startupOptions(StartupOptions.initial()) .debeziumProperties(debeziumProperties) .splitSize(2) .build(); 返回的结果: {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null} 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同