Re: 如何手动读取Savepoints中的内容?

2021-04-06 文章 xingoo
已解决,阅读完善相关源码后,了解到。需要通过BatchEnvironment,把Savepoint当做输入,然后构造序列化解析器和类型等信息,通过DataSet.collect()进行解析,就可以读取到目标数据了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

如何手动读取Savepoints中的内容?

2021-04-06 文章 xingoo
目前有个需求想要实现Flink SQL的保存点,但是由于在SQL中operator
uid是随机生成的,一旦修改SQL会导致无法读取到Savepoints中的状态信息。

想到一种方式是正常执行Savepoint操作,然后启动的时候手动读取Savepoint中的内容,获取Kafka每个分区的消费offset,再替换到SQL代码中。

目前通过在源码中打印相关日志可以发现,保存点触发时,消费的记录正常读取到9,但是手动读取的时候,却找不到这个而信息。

下面是我手动读取的代码,以下内容是参考Savepoint单元测试中的读取案例:

String savepointPath =
"hdfs://namenode:8020/flink/savepoints/test040603/savepoint-a2cfcd-2ee1c4afcf9f";
CompletedCheckpointStorageLocation location =
AbstractFsCheckpointStorageAccess
.resolveCheckpointPointer(savepointPath);

try (DataInputStream stream = new
DataInputStream(location.getMetadataHandle().openInputStream())) {
CheckpointMetadata metadata = Checkpoints
.loadCheckpointMetadata(stream,
Thread.currentThread().getContextClassLoader(), savepointPath);
System.out.println(metadata);
} catch (IOException e) {
e.printStackTrace();
}


 

通过debug发现,有三个operator,但是记录的offset是1038,而不是9,不知道正确的读取方法是什么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL UDF 如何自定义Metrics

2021-02-24 文章 xingoo
HI,
 如题,想要在Flink
SQL中通过自定义UDF增加指标,从而实现自定义告警。那么如何在UDF中获取到RuntimeContext从而修改Metrics呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink standalone模式如何区分各个任务的日志?

2021-02-21 文章 xingoo
Hi, 

这样体验上还是不太友好,如果能做成spark那种每个Job独立记录日志就好了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink standalone模式如何区分各个任务的日志?

2021-02-20 文章 xingoo
Dear All:
目前Flink部署主要采用standalone,想了解下如何在同一个taskmanager区分各个job的日志。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 获取flinksql返回的查询结果

2021-01-18 文章 xingoo
最近刚好做了这个功能,基本的原理如Jeff所说,提交到集群后sink到内存。
然后在本地创建socket server,sink到内存的数据直接通过socket传回到本地。

具体做法即可以参考zeppelin,也可以参考sql-client模块的ChangelogCollectStreamResult



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 1.11.3 PackagedProgram启动报错

2021-01-18 文章 xingoo
til.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:426)
... 14 more
|




查阅资料说是类加载的问题,已经修改为parent-first模式。
目前猜测是TaskManager端的执行有问题,无法加载到用户jar。
不知道是不是我PackagedProgram使用有问题,望解答。


Best,
xingoo