回复: cdc读取oracle数据如何解析

2024-06-26 Thread 15868861416
你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的


| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | Yanquan Lv |
| 发送日期 | 2024年06月26日 14:46 |
| 收件人 |  |
| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs);




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:

{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同



回复:cdc读取oracle数据如何解析

2024-06-25 Thread 15868861416
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:
{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同


回复: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-20 Thread 15868861416
参考这个案例试试:
CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proc_time` AS PROCTIME()
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_dim (
  rowkey INT,
  family1 ROW,
  family2 ROW,
  family3 ROW
) WITH (
  'connector'='cloudhbase',
  'table-name'='',
  'zookeeper.quorum'=''
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  f1c1 INT,
  f3c3 STRING
) WITH (
  'connector'='blackhole'
);

INSERTINTO blackhole_sink
 SELECT a, family1.col1 as f1c1,  family3.col3 as f3c3 FROM datagen_source
JOIN hbase_dim FORSYSTEM_TIMEASOF datagen_source.`proc_time` as h ON 
datagen_source.a = h.rowkey;


| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | xiaohui zhang |
| 发送日期 | 2024年06月20日 10:03 |
| 收件人 |  |
| 主题 | Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列 |
flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。
如果你确定只需写入部分数据,在DDL中只定义你用到的部分


zboyu0104  于2024年6月14日周五 15:43写道:

怎么退订
from 阿里邮箱
iPhone--
发件人:谢县东
日 期:2024年06月06日 16:07:05
收件人:
主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

各位好:


flink版本: 1.13.6
我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:


CREATE TABLE hbase_test_db_test_table_xxd (
rowkey STRING,
cf1 ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test_db:test_table_t1',
'zookeeper.quorum' = 'xxx:2181',
'zookeeper.znode.parent' = '/hbase',
'null-string-literal' = '',
'sink.parallelism' = '2'
);


hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey,
row('xxd_2', 'boy', '10') as cf1;




如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?











flink checkpoint 延迟的性能问题讨论

2024-06-16 Thread 15868861416
各位大佬,
背景:
实际测试flink读Kafka 数据写入hudi, checkpoint的间隔时间是1min, 
state.backend分别为filesystem,测试结果如下:



写hudi的checkpoint 的延迟





写iceberg得延迟:



疑问: hudi的checkpoint的文件数据比iceberg要大很多,如何降低flink写hudi的checkpoint的延迟?


| |
博星
|
|
15868861...@163.com
|