大家好!
最近在开发一个项目时,在使用CDC表和维表表做Temporal Table
JOIN时,发现2个表Join时join字段的类型必须一致,否则提交时提示如下的错误
The main method caused an error: Temporal table join requires an equality
condition on fields of table.
为了解决上述问题,我们做了如下尝试:
1:在join时,对维表要关联的字段使用cast转换,如: JOIN ON CAST(tableA.filedA AS
Flink
SQL任务提交后,从JobManager监控指标中发现kafka的offset有2个指标信息,currentOffsets和committedOffsets,当Kafka无新增数据,程序运行一段时间后,发现指标仪表盘上显示
currentOffsets:2897
committedOffsets:2898
这2个值没有变化(应该是数据已经消费完毕了),现在的疑惑是:怎么这2个offset的值还不一致?committedOffsets表示已经提交和保存state中的offset吗?currentOffsets表示啥含义?烦请指教下,多谢!
感谢您的答复,刚才看到您的答复后,紧急远程连接跑了下,stdout还真的有数据出来了,周一上班时间再好好测试下,万分感谢!
在 2021-02-27 19:08:25,"xg...@126.com" 写道:
>1503,61,15811,1614405166858
>1504,61,15813,1614405333871
>1505,61,15814,1614405544862
>1506,61,15814,1614405673863
>就这几条数据,并行度设置为1
>
>
>
>发件人: yinghua...@163.com
我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert
into时创建Hive表时提示没有连接器的配置
Table options are: 'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr'
'sink.partition-commit.delay'='0S'
'sink.partition-commit.policy.kind'='metastore,success-file'
1:先前测试过使用stopWithSavepoint时会将之前成功保存的checkpoint数据给删除掉,后来我们查看了下源码,里面描述如下,就是调用该方法时Flink会将程序设置成Finished态的,可能和实际使用场景有出入。
/**
* Stops a program on Flink cluster whose job-manager is configured in this
client's configuration.
* Stopping works only for streaming programs. Be aware, that the program
刚才又操作了一次,我重新截图了放在附件里了,
开始在18:29:29时没有看到chk-8生成,就是在18:29:29时checkpoint没有生成,
然后18:29:34查看时,checkpoint生成了
然后18:29:51查看时,checkpoint还在,此时我停止了那个任务
18:30:11去查看时,checkpoint的chk-8不见了
在 2021-01-14 18:04:27,"tison" 写道:
>没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
>
Flink代码里Json反序列化里有2个参数应该对你有帮助,你到官网上查询下怎么使用
上述2个配置项的参数名字分别是:
format.fail-on-missing-field
format.ignore-parse-errors
在 2020-12-20 10:48:04,"陈帅" 写道:
>业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。
>请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka
感谢您的答复!!
在 2020-12-08 15:57:32,"Leonard Xu" 写道:
>Hi,
>Flink 的元数据存放在catalog中的,也支持多种catalog(embedded,
>HIve,JDBC,自定义catalog),默认Flink使用内置的GenericInMemoryCatalog,这个是in
>memory的catalog,元数据都在这里,生产环境上可以使用HiveCatalog
>
>
>祝好
>Leonard
>[1]
12 matches
Mail list logo