退订
退订
退订
streaming.api.operators和streaming.runtime.operators的区别是啥?
如题,想知道这个分类的标准是啥呢?
Re: CheckpointedFunction 与 KeyedState
Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和 snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里; 原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的; 而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做; On Fri, May 5, 2023 at 10:58 PM sjf0115 wrote: > CheckpointedFunction 接口的 initializeState 方法提供了访问 > FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问 > OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过 > CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState: > ```java > context.getKeyedStateStore().getState(stateDescriptor); > ``` > 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗? > ```java > public static class TemperatureAlertFlatMapFunction extends > RichFlatMapFunction, Tuple3> > implements CheckpointedFunction { > // 温度差报警阈值 > private double threshold; > // 上一次温度 > private ValueState lastTemperatureState; > private Double lastTemperature; > public TemperatureAlertFlatMapFunction(double threshold) { > this.threshold = threshold; > } > > > @Override > public void flatMap(Tuple2 sensor, > Collector> out) throws Exception { > String sensorId = sensor.f0; > // 当前温度 > double temperature = sensor.f1; > // 保存当前温度 > lastTemperature = temperature; > // 是否是第一次上报的温度 > if (Objects.equals(lastTemperature, null)) { > return; > } > double diff = Math.abs(temperature - lastTemperature); > if (diff > threshold) { > // 温度变化超过阈值则输出 > out.collect(Tuple3.of(sensorId, temperature, diff)); > } > } > > > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > // 获取最新的温度之后更新保存上一次温度的状态 > if (!Objects.equals(lastTemperature, null)) { > lastTemperatureState.update(lastTemperature); > } > } > > > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > ValueStateDescriptor stateDescriptor = new > ValueStateDescriptor<>("lastTemperature", Double.class); > lastTemperatureState = > context.getKeyedStateStore().getState(stateDescriptor); > if (context.isRestored()) { > lastTemperature = lastTemperatureState.value(); > } > } > } > ``` > > -- Best, Hangxiang.
CheckpointedFunction 与 KeyedState
CheckpointedFunction 接口的 initializeState 方法提供了访问 FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问 OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过 CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState: ```java context.getKeyedStateStore().getState(stateDescriptor); ``` 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗? ```java public static class TemperatureAlertFlatMapFunction extends RichFlatMapFunction, Tuple3> implements CheckpointedFunction { // 温度差报警阈值 private double threshold; // 上一次温度 private ValueState lastTemperatureState; private Double lastTemperature; public TemperatureAlertFlatMapFunction(double threshold) { this.threshold = threshold; } @Override public void flatMap(Tuple2 sensor, Collector> out) throws Exception { String sensorId = sensor.f0; // 当前温度 double temperature = sensor.f1; // 保存当前温度 lastTemperature = temperature; // 是否是第一次上报的温度 if (Objects.equals(lastTemperature, null)) { return; } double diff = Math.abs(temperature - lastTemperature); if (diff > threshold) { // 温度变化超过阈值则输出 out.collect(Tuple3.of(sensorId, temperature, diff)); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 获取最新的温度之后更新保存上一次温度的状态 if (!Objects.equals(lastTemperature, null)) { lastTemperatureState.update(lastTemperature); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("lastTemperature", Double.class); lastTemperatureState = context.getKeyedStateStore().getState(stateDescriptor); if (context.isRestored()) { lastTemperature = lastTemperatureState.value(); } } } ```
退订
退订
退订
刘晶 John Liu DBA 藤蔓技术部 Tenvine Technical Department 深圳市南山区高新科技园科技中三路科兴科学园B4-712 邮编 518052 Mobile: 18820970747
Re: flink issue可以登录,但是flink中文邮箱账号密码错误,是出现什么原因了嘛
> > flink issue可以登录 这个是jira账号吗? flink中文邮箱账号密码 什么是flink中文邮箱账号 ?有无登陆页面链接 On Wed, Apr 19, 2023 at 11:36 AM kcz <573693...@qq.com.invalid> wrote: > 请帮忙看看是我哪里出问题了嘛?我的账号是kcz。我想咨询大佬flink avro的问题 > > > > > kcz > 573693...@qq.com > > > >
Re: 退订
Please send email to user-zh-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user-zh-unsubscr...@flink.apache.org , and you can refer[1][2] for more details. 请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理邮件订阅。 Best Hongshun, [1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8 [2]https://flink.apache.org/community.html#mailing-lists On Sun, Apr 23, 2023 at 10:30 PM 朱静 wrote: > 退订
Re: 退订
Please send email to user-zh-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user-zh-unsubscr...@flink.apache.org , and you can refer[1][2] for more details. 请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理邮件订阅。 Best Hongshun, [1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8 [2]https://flink.apache.org/community.html#mailing-lists On Tue, May 2, 2023 at 9:45 PM 胡家发 <15802974...@163.com> wrote: > 退订
Re: 退订
Please send email to user-zh-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user-zh-unsubscr...@flink.apache.org , and you can refer[1][2] for more details. 请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理邮件订阅。 On Fri, May 5, 2023 at 2:59 PM 李浩 wrote: > >
Re: 退订
如果需要取消订阅 u...@flink.apache.org 和 d...@flink.apache.org 邮件组,请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 和 dev-unsubscr...@flink.apache.org ,参考[1] [1] https://flink.apache.org/zh/community/ On Fri, May 5, 2023 at 3:24 PM wuzhongxiu wrote: > 退订 > > > > | | > go574...@163.com > | > | > 邮箱:go574...@163.com > | > > > > > 回复的原邮件 > | 发件人 | willluzheng | > | 日期 | 2023年05月05日 15:22 | > | 收件人 | user-zh@flink.apache.org | > | 抄送至 | | > | 主题 | 退订 | > 退订
退订
退订 | | go574...@163.com | | 邮箱:go574...@163.com | 回复的原邮件 | 发件人 | willluzheng | | 日期 | 2023年05月05日 15:22 | | 收件人 | user-zh@flink.apache.org | | 抄送至 | | | 主题 | 退订 | 退订
退订
退订