flink sql canal json格式侧输出parse error记录问题

2023-05-06 文章 casel.chen
线上使用flink sql消费kafka topic canal json格式数据,发现有一些数据中有的时间字段值为-00-00 
00:00:00无法被解析,于是加了'canal-json.ignore-parse-errors = true' 
参数,作业是能够正常运行了,但同时我们也希望知道哪些数据解析失败以便发给上游业务系统去自查。想问一下除了ignore外,有办法将这些parse 
error数据输出到另外一个kafka topic吗?谢谢!

Re:Re: CheckpointedFunction 与 KeyedState

2023-05-06 文章 sjf0115
谢了
在 2023-05-06 10:36:02,"Hangxiang Yu"  写道:
>Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和
>snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里;
>原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set
>Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的;
>而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update
>value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;
>
>On Fri, May 5, 2023 at 10:58 PM sjf0115  wrote:
>
>> CheckpointedFunction 接口的 initializeState 方法提供了访问
>> FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问
>> OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过
>> CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
>> ```java
>> context.getKeyedStateStore().getState(stateDescriptor);
>> ```
>> 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
>> ```java
>> public static class TemperatureAlertFlatMapFunction extends
>> RichFlatMapFunction, Tuple3>
>> implements CheckpointedFunction  {
>> // 温度差报警阈值
>> private double threshold;
>> // 上一次温度
>> private ValueState lastTemperatureState;
>> private Double lastTemperature;
>> public TemperatureAlertFlatMapFunction(double threshold) {
>> this.threshold = threshold;
>> }
>>
>>
>> @Override
>> public void flatMap(Tuple2 sensor,
>> Collector> out) throws Exception {
>> String sensorId = sensor.f0;
>> // 当前温度
>> double temperature = sensor.f1;
>> // 保存当前温度
>> lastTemperature = temperature;
>> // 是否是第一次上报的温度
>> if (Objects.equals(lastTemperature, null)) {
>> return;
>> }
>> double diff = Math.abs(temperature - lastTemperature);
>> if (diff > threshold) {
>> // 温度变化超过阈值则输出
>> out.collect(Tuple3.of(sensorId, temperature, diff));
>> }
>> }
>>
>>
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {
>> // 获取最新的温度之后更新保存上一次温度的状态
>> if (!Objects.equals(lastTemperature, null)) {
>> lastTemperatureState.update(lastTemperature);
>> }
>> }
>>
>>
>> @Override
>> public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>> ValueStateDescriptor stateDescriptor = new
>> ValueStateDescriptor<>("lastTemperature", Double.class);
>> lastTemperatureState =
>> context.getKeyedStateStore().getState(stateDescriptor);
>> if (context.isRestored()) {
>> lastTemperature = lastTemperatureState.value();
>> }
>> }
>> }
>> ```
>>
>>
>
>-- 
>Best,
>Hangxiang.


Re: 退订

2023-05-06 文章 Hongshun Wang
Please send email to  user-zh-unsubscr...@flink.apache.org
 if you want to unsubscribe the mail from
user-zh-unsubscr...@flink.apache.org , and you can
refer[1][2] for more details.
请发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org 
地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理邮件订阅。

Best Hongshun,

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2]https://flink.apache.org/community.html#mailing-lists

On Fri, Apr 21, 2023 at 10:50 AM 杨光跃  wrote:

>
>
> 退订
> | |
> 杨光跃
> |
> |
> yangguangyuem...@163.com
> |
>
>


Re: streaming.api.operators和streaming.runtime.operators的区别是啥?

2023-05-06 文章 Hongshun Wang
我来谈一下我个人的看法,streaming.api.operators是提供给用户使用的stream api,
用户可以使用和扩展该接口。而streaming.runtime.operators是用户侧不感知,在执行时由flink自动调用的。比如:
Sink用户可以自己设置,如kafkaSink。但是输出时的state处理和事务commit(CommitterOperator)是Flink根据不同类型的Sink自动生成的统一逻辑,用户无需自己设置和实现。

Best
Hongshun

On Sat, May 6, 2023 at 11:57 AM yidan zhao  wrote:

> 如题,想知道这个分类的标准是啥呢?
>