Re: flink 1.15

2022-04-02 Thread Zhanghao Chen
不是的哈。MVP 是 Minimum Viable Product (最简可行产品)的缩写,代表一个只实现了核心功能,听取早期用户反馈来后续进一步完善的版本。

Best,
Zhanghao Chen

From: guanyq 
Sent: Saturday, April 2, 2022 14:56
To: user-zh@flink.apache.org 
Subject: flink 1.15

看了FFA的分享(流批一体) Flink1.15版本推出 MVP版本,动态表存储的流批一体


请问MVP版本是收费版么?


Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-07 Thread Zhanghao Chen
你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Thursday, April 7, 2022 11:41
To: user-zh 
Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。

参考 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes

基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
flink session cluster my-first-flink-cluster successfully, JobManager
Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。

并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar

--- 然后如下方式是可以的,不清楚是啥问题呢。
1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
的clusterIp:port为192.168.127.57:8081。
2 查看任务
flink list  -m 192.168.127.57:8081
3 提交任务
flink run  -m 192.168.127.57:8081
/home/work/flink/examples/streaming/TopSpeedWindowing.jar

--- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。


Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-08 Thread Zhanghao Chen
Standalone K8s 和 Native K8s 部署模式主要的区别是 Native K8s 模式下的 Flink 具备和 K8s API Server 
直接通信来申请所需的资源和感知集群状态的能力,而 Standalone K8s 对底层的 K8s 集群没有直接感知,这带来了两个主要区别:


  1.  在部署上,Standalone K8s 需要你手动去创建集群所需要的 deployment、configmap、service,而 Native 
K8s 你只需要调用 Flink CLI 就行。
  2.  在资源申请上,Standalone K8s 使用被动资源管理 - 需要你或者其他外部系统分配好资源,Flink 
被动接受这些分配好的资源;Native K8s 使用主动资源管理 - Flink 集群启动后自己会根据提交上来的作业的属性去跟 K8s 申请所需要的资源。

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Friday, April 8, 2022 10:52
To: user-zh 
Subject: Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

貌似官网对flink k8s情况有2个入口,分别为:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#session-mode
和
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/。

分别对应 Resource Providers/Standalone/Kubernetes 和 Kubernetes Resource
Providers/Native
Kubernetes。有人知道啥区别吗。从文档来看,貌似前者是给了具体的service、deployment等yml描述,然后自己创建集群。后者是脚本一键创建。但如果仅仅是这个区别,为啥有“standalone/kubernetes”和“native
kubernetes”这种区分呢?

>
> 集群是3台物理机搭建,非minikube。
> 不清楚是否和网卡有关,init搭建时就有网络问题,k8s要根据默认路由网卡ip决定监听的地址。
> 但是我感觉这个场景不应该,因为既然是clusterIp,创建后提示信息就应该提示用clusterIp吧,为啥提示的用了本机的网卡ip呢。
>
> yidan zhao  于2022年4月8日周五 10:38写道:
> >
> > 如下是 describe svc my-first-flink-cluster-rest 的结果:
> > Name: my-first-flink-cluster-rest
> > Namespace:default
> > Labels:   app=my-first-flink-cluster
> >   type=flink-native-kubernetes
> > Annotations:  
> > Selector:
> > app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes
> > Type: LoadBalancer
> > IP Family Policy: SingleStack
> > IP Families:  IPv4
> > IP:   192.168.127.57
> > IPs:  192.168.127.57
> > Port: rest  8081/TCP
> > TargetPort:   8081/TCP
> > NodePort: rest  31419/TCP
> > Endpoints:192.168.130.11:8081
> > Session Affinity: None
> > External Traffic Policy:  Cluster
> > Events:   
> >
> > 如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过
> > -Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。
> >
> > yu'an huang  于2022年4月8日周五 02:11写道:
> > >
> > > 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?
> > >
> > > 方便的话可以分享你运行这个命令的结果吗?
> > > 》kubectl describe svc  my-first-flink-cluster-rest
> > >
> > >
> > >
> > > > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  
> > > > wrote:
> > > >
> > > > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: yidan zhao 
> > > > Sent: Thursday, April 7, 2022 11:41
> > > > To: user-zh 
> > > > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> > > >
> > > > 参考 
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> > > > flink session cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> > > >
> > > > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> > > > ./bin/flink run \
> > > >--target kubernetes-session \
> > > >-Dkubernetes.cluster-id=my-first-flink-cluster \
> > > >./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 然后如下方式是可以的,不清楚是啥问题呢。
> > > > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> > > > 的clusterIp:port为192.168.127.57:8081。
> > > > 2 查看任务
> > > > flink list  -m 192.168.127.57:8081
> > > > 3 提交任务
> > > > flink run  -m 192.168.127.57:8081
> > > > /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。
> > >


Re: flink命令行参数不生效问题

2022-04-11 Thread Zhanghao Chen
你好,-m 配合 -yxx 的参数是早期 Flink on YARN 的 cli 参数用法,后来社区开始推进一套新的统一的 cli 命令,使用 -t 
指定部署形式,并将原先的 cli options 统一动态参数化,比如原先的 -yxx 命令都能从 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn
 找到替代的动态参数。
Configuration | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn>
Key Default Type Description; restart-strategy.fixed-delay.attempts: 1: 
Integer: The number of times that Flink retries the execution before the job is 
declared as failed if restart-strategy has been set to fixed-delay.: 
restart-strategy.fixed-delay.delay
nightlies.apache.org


Best,
Zhanghao Chen

From: gangzi <1139872...@qq.com.INVALID>
Sent: Monday, April 11, 2022 19:55
To: user-zh 
Subject: flink命令行参数不生效问题

我用命令提交作业:flink run -t yarn-per-job -ynm SocketWordCount -yqu root.root  -d 
-n SocketWindowWordCount.jar --hostname 10.199.0.97 --port 9878。结果作业提交成功之后发现 
-ynm和-yqu不生效。后来通过查看源码发现是因为如果指定了 
-t,那么-y开头的所有参数都不生效了,因为-y系列参数是在FlinkYarnSessionCli中解析的,而源码中:public 
CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
    LOG.debug("Custom commandlines: {}", customCommandLines);
    for (CustomCommandLine cli : customCommandLines) {
        LOG.debug(
                "Checking custom 
commandline {}, isActive: {}", cli, cli.isActive(commandLine));
        if (cli.isActive(commandLine)) {
            return cli;
        }
    }
    throw new IllegalStateException("No valid command-line found.");
}
这段代码返回的是GenericCLI。导致后面的:
final Configuration effectiveConfiguration =
        getEffectiveConfiguration(activeCommandLine, 
commandLine, programOptions, jobJars);
这行代码返回的命令行参数配置只包含了GenericCli中定义的参数。想请教一下,-t和-m设置参数时有啥区别?如何解决上述参数不生效的问题?这是一个bug么?


Re: flinksql执行时提示自定义UDF无法加载的

2022-04-11 Thread Zhanghao Chen
你好,可以贴下客户端的具体提交命令吗?

Best,
Zhanghao Chen

From: 799590...@qq.com.INVALID <799590...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 10:46
To: user-zh 
Subject: flinksql执行时提示自定义UDF无法加载的

环境信息

flink-1.13.6_scala_2.11
java 1.8

使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

UDF代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {

public String eval(String s, Integer start,Integer end) {
return s.substring(start,end);
}
}

udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf 
 jar路径列表,在执行下面的sql时报错

insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as 
mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id 
as id, user_id as user_id, status as status from (select id,user_id,status from 
data_2455_5068_model) where status < '4')

2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)

类加载器代码:
public static void loadJar(URL jarUrl) {
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", 
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (!accessible) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) 
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
/**
 * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
 *
 * @param fsUrlStreamHandlerFactory
 * @throws Exception
 */
public static void registerFactory(final FsUrlStreamHandlerFactory 
fsUrlStreamHandlerFactory)
throws Exception {
log.info("registerFactory : " + 
fsUrlStreamHandlerFactory.getClass().getName());
final Field factoryField = URL.class.getDeclaredField("factory");
factoryField.setAccessible(true);
final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
lockField.setAccessible(true);
synchronized (lockField.get(null)) {
final URLStreamHandlerFactory originalUrlStreamHandlerFactory = 
(URLStreamHandlerFactory) factoryField.get(null);
factoryField.set(null, null);
URL.setURLStreamHandlerFactory(protocol -> {
if ("hdfs".equals(protocol)) {
return 
fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
} else {
   

Re: Re: 日志里提示 The state is cleared because of state ttl. This will result in incorrect result 如何解决?

2022-04-11 Thread Zhanghao Chen
你可以用 SQL client SET 'table.exec.state.ttl' = 'xxx'; 的语法来实现哈,具体可以参考下:


  1.  
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries
SQL Client | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries>
The SET command allows you to tune the job execution and the sql client 
behaviour. See SQL Client Configuration below for more details.. After a query 
is defined, it can be submitted to the cluster as a long-running, detached 
Flink job. The configuration section explains how to declare table sources for 
reading data, how to declare table sinks for writing data, and how to configure 
other table ...
nightlies.apache.org
  2.
  3.  
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl
Configuration | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl>
Configuration # By default, the Table & SQL API is preconfigured for producing 
accurate results with acceptable performance. Depending on the requirements of 
a table program, it might be necessary to adjust certain parameters for 
optimization. For example, unbounded streaming programs may need to ensure that 
the required state size is capped (see streaming concepts).
nightlies.apache.org


Best,
Zhanghao Chen

From: 段晓雄 
Sent: Monday, April 11, 2022 20:23
To: user-zh@flink.apache.org 
Subject: RE: Re: 日志里提示 The state is cleared because of state ttl. This will 
result in incorrect result 如何解决?

我使用sql-client.sh 执行sql创建的任务,如何设置 ttl 的时长?

On 2022/04/11 11:14:36 yidan zhao wrote:
> You can increase the state ttl to avoid this.
> 这个已经明确了方法了,增加ttl的时长。
>
> 段晓雄  于2022年4月11日周一 09:52写道:
> >
> > 各位老大,
> >
> > 现在是 Flink 1.14.4 集群,通过 pyflink执行 sql 做流处理,
> >
> > taskmanager日志大量 The state is cleared because of state ttl. This will result 
> > in incorrect result. You can increase the state ttl to avoid this. 
> > 为什么出现?如何解决?
> >
> >
> > 2022-04-09 17:08:54,672 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 284
> > 2022-04-09 17:08:54,672 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 284
> > 2022-04-09 17:08:54,852 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,852 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,852 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,922 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 277
> > 2022-04-09 17:08:54,922 INFO  
> > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> >  [] - Committing the state for checkpoint 277
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state is cleared because of state ttl. This will result in incorrect 
> > result. You can increase the state ttl to avoid this.
> > 2022-04-09 17:08:54,952 INFO  
> > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > The state

Re: flink命令行参数不生效问题

2022-04-11 Thread Zhanghao Chen
我知道的比较相关的 FLIP 和 JIRA 有下面两个:

1. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
FLIP-73: Introducing Executors for job submission - Apache Flink - The Apache 
Software 
Foundation<https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission>
Status. Current state: Accepted. Discussion thread: 
https://lists.apache.org/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache
 ...
cwiki.apache.org

2. https://issues.apache.org/jira/browse/FLINK-15179 特别是这个 JIRA 的评论区可以看一下


Best,
Zhanghao Chen

From: QQ <1139872...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 9:19
To: user-zh@flink.apache.org 
Subject: Re: flink命令行参数不生效问题

非常感谢解答。关于这套新的统一的cli命令的说明或者FLIP在哪里?

> 2022年4月11日 下午11:49,Zhanghao Chen  写道:
>
> 你好,-m 配合 -yxx 的参数是早期 Flink on YARN 的 cli 参数用法,后来社区开始推进一套新的统一的 cli 命令,使用 -t 
> 指定部署形式,并将原先的 cli options 统一动态参数化,比如原先的 -yxx 命令都能从 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn
>  找到替代的动态参数。
> Configuration | Apache 
> Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#yarn>
> Key Default Type Description; restart-strategy.fixed-delay.attempts: 1: 
> Integer: The number of times that Flink retries the execution before the job 
> is declared as failed if restart-strategy has been set to fixed-delay.: 
> restart-strategy.fixed-delay.delay
> nightlies.apache.org
>
>
> Best,
> Zhanghao Chen
> 
> From: gangzi <1139872...@qq.com.INVALID>
> Sent: Monday, April 11, 2022 19:55
> To: user-zh 
> Subject: flink命令行参数不生效问题
>
> 我用命令提交作业:flink run -t yarn-per-job -ynm SocketWordCount -yqu root.root  
> -d -n SocketWindowWordCount.jar --hostname 10.199.0.97 --port 
> 9878。结果作业提交成功之后发现 -ynm和-yqu不生效。后来通过查看源码发现是因为如果指定了 
> -t,那么-y开头的所有参数都不生效了,因为-y系列参数是在FlinkYarnSessionCli中解析的,而源码中:public 
> CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
>     LOG.debug("Custom commandlines: {}", customCommandLines);
>     for (CustomCommandLine cli : customCommandLines) {
>         LOG.debug(
>                 "Checking custom 
> commandline {}, isActive: {}", cli, cli.isActive(commandLine));
>         if (cli.isActive(commandLine)) {
>             return cli;
>         }
>     }
>     throw new IllegalStateException("No valid command-line found.");
> }
> 这段代码返回的是GenericCLI。导致后面的:
> final Configuration effectiveConfiguration =
>         getEffectiveConfiguration(activeCommandLine, 
> commandLine, programOptions, jobJars);
> 这行代码返回的命令行参数配置只包含了GenericCli中定义的参数。想请教一下,-t和-m设置参数时有啥区别?如何解决上述参数不生效的问题?这是一个bug么?



Re: flink的Thread Dump日志

2022-04-20 Thread Zhanghao Chen
不同的线程 dump 分析工具支持的文件格式可能不一样,你需要去工具官网看下支持以什么方式生成的 thread dump 文件,然后远程登录 TM 
所在物理机生成 dump 文件。

Best,
Zhanghao Chen

From: 陈卓宇 <2572805...@qq.com.INVALID>
Sent: Wednesday, April 20, 2022 17:00
To: user-zh 
Subject: flink的Thread Dump日志

大佬您好:
因为线上出现了一个flink cdc mongodb链接数不释放的问题,我怀疑可能是线程方面的问题,打开flink的web ui在Task 
managers界面中的Thread Dump将dump日志下载,放入Dump分析工具中发现报无法识别的问题,请问这该如何解决


卓宇


 


Re: Filnk: Job leader for job id xxxx lost leadership

2022-04-20 Thread Zhanghao Chen
出现 Job leader for job id  lost 说明是 jm leader 在 zk 上的 session timeout 
了。可能的原因有

  1.  JM 和 ZK 网络连接有抖动,ZK 连接进入 suspended,并且你没有配置容忍 zk 连接 suspended(1.14 及以上版本配置 
high-availability.zookeeper.client.tolerate-suspended-connections 参数)或者配了但是 
session timeout 时间设的太短触发丢主
  2.  JM 确实经常挂
  3.  JM GC 很严重,导致了和 zk 连接有问题进入 suspended 状态

Best,
Zhanghao Chen

From: magic 
Sent: Wednesday, April 20, 2022 17:49
To: user-zh 
Subject: Filnk: Job leader for job id  lost leadership

Hi,all
我们在使用Flink 消费kafka数据写入hudi时,经常会报错:Job leader for job id  
lost  leadership, 但是同集群 其他flink 任务就没问题,请教下前辈们,这是什么原因呢,感觉不太像zk的问题


Re: Re: Re: 日志里提示 The state is cleared because of state ttl. This will result in incorrect result 如何解决?

2022-04-20 Thread Zhanghao Chen
你是用的什么 Flink 集群部署模式提交的 SQL 任务呢?

Best,
Zhanghao Chen

From: 段晓雄 
Sent: Saturday, April 16, 2022 19:52
To: user-zh@flink.apache.org 
Subject: RE: Re: Re: 日志里提示 The state is cleared because of state ttl. This will 
result in incorrect result 如何解决?

Zhanghao,

感谢帮助!我在 sql-client.sh 里设置了 table.exec.state.ttl= 
12960,但现在我不知道如何确认是否设置成功,我从web UI 和 rest api 返回的任务状态和checkpoint状态中都没有找到 
state ttl 的值,我发现任务状态还是不断增大,我如何能确认任务 state ttl 的情况呢?


On 2022/04/12 04:15:37 Zhanghao Chen wrote:
> 你可以用 SQL client SET 'table.exec.state.ttl' = 'xxx'; 的语法来实现哈,具体可以参考下:
>
>
>   1.  
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries
> SQL Client | Apache 
> Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#running-sql-queries>
> The SET command allows you to tune the job execution and the sql client 
> behaviour. See SQL Client Configuration below for more details.. After a 
> query is defined, it can be submitted to the cluster as a long-running, 
> detached Flink job. The configuration section explains how to declare table 
> sources for reading data, how to declare table sinks for writing data, and 
> how to configure other table ...
> nightlies.apache.org
>   2.
>   3.  
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl
> Configuration | Apache 
> Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/#table-exec-state-ttl>
> Configuration # By default, the Table & SQL API is preconfigured for 
> producing accurate results with acceptable performance. Depending on the 
> requirements of a table program, it might be necessary to adjust certain 
> parameters for optimization. For example, unbounded streaming programs may 
> need to ensure that the required state size is capped (see streaming 
> concepts).
> nightlies.apache.org
>
>
> Best,
> Zhanghao Chen
> 
> From: 段晓雄 
> Sent: Monday, April 11, 2022 20:23
> To: user-zh@flink.apache.org 
> Subject: RE: Re: 日志里提示 The state is cleared because of state ttl. This will 
> result in incorrect result 如何解决?
>
> 我使用sql-client.sh 执行sql创建的任务,如何设置 ttl 的时长?
>
> On 2022/04/11 11:14:36 yidan zhao wrote:
> > You can increase the state ttl to avoid this.
> > 这个已经明确了方法了,增加ttl的时长。
> >
> > 段晓雄  于2022年4月11日周一 09:52写道:
> > >
> > > 各位老大,
> > >
> > > 现在是 Flink 1.14.4 集群,通过 pyflink执行 sql 做流处理,
> > >
> > > taskmanager日志大量 The state is cleared because of state ttl. This will 
> > > result in incorrect result. You can increase the state ttl to avoid this. 
> > > 为什么出现?如何解决?
> > >
> > >
> > > 2022-04-09 17:08:54,672 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 284
> > > 2022-04-09 17:08:54,672 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 284
> > > 2022-04-09 17:08:54,852 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17:08:54,852 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17:08:54,852 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17:08:54,922 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 277
> > > 2022-04-09 17:08:54,922 INFO  
> > > org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
> > >  [] - Committing the state for checkpoint 277
> > > 2022-04-09 17:08:54,952 INFO  
> > > org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - 
> > > The state is cleared because of state ttl. This will result in incorrect 
> > > result. You can increase the state ttl to avoid this.
> > > 2022-04-09 17

Re: Unaligned Checkpoint

2022-06-12 Thread Zhanghao Chen
你好,

Unaligned checkpoint 是个底层特性,要使用的话只要设置 Flink 参数 
execution.checkpointing.unaligned = true 就行,在 SQL client 中,可以使用 SET "key" = 
"value" 的语法设置 Flink 参数的值。

Unaligned checkpoint 较之 aligned checkpoint 主要的改变在于

  *   unaligned cp 在输入缓冲区收到第一个 cp barrier 
的时候立即触发快照并直接输出至下游;代价是快照需要记录缓冲区中的数据来保证一致性,产生更多 io 并增大 cp 大小。
  *   aligned cp 在算子收到最后一个 cp barrier 完成 barrier 对齐后才触发快照,barrier 对齐期间较早收到 
barrier 的 input channel 会被阻塞,在反压时阻塞时间会显著增加,导致 cp 速度变慢;好处是 barrier 
对齐的过程使得快照不需要记录缓冲等待队列中的数据就可以保证一致性。

Best,
Zhanghao Chen

From: 小昌同学 
Sent: Saturday, June 11, 2022 17:18
To: user-zh@flink.apache.org 
Subject: Unaligned Checkpoint

大佬们可以说说Unaligned Checkpoint的实现吗  看了不少文档 没有太看懂  我如果想在sql里面实现  这个该怎么设置啊  请大佬们指教


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: unsubscribe; 退订

2022-06-12 Thread Zhanghao Chen
你好, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org。

Best,

Zhanghao Chen

From: chenshu...@foxmail.com 
Sent: Sunday, June 12, 2022 11:44
To: user-zh 
Subject: unsubscribe; 退订

unsubscribe
退订



chenshu...@foxmail.com


Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 Thread Zhanghao Chen
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 ownerreference,因此如果想在保留 
HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。

2.基于k8s做HA的Flink job id皆为。

开启 HA 的 Application mode 的 Flink job id 
皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA 
服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 JM 
生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 job id 
(一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp 
相关联,导致作业从全新状态恢复。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。

可以看下官方的博客文章: 
https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 JIRA 
设计文档:https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink


Best,
Zhanghao Chen

From: m18814122325 
Sent: Sunday, June 12, 2022 22:45
To: user-zh@flink.apache.org 
Subject: Flink k8s HA 手动删除作业deployment导致的异常

Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native 
k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  
bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME
 DATA   AGE

flink-bdra-sql-application-job--config-map  
2  13m

flink-bdra-sql-application-job-cluster-config-map   
 1  13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv 
-Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p 
-Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 
-Dkubernetes.namespace=bdra-dev-flink-standalone 
-Dkubernetes.service-account=bdra-dev-flink-standalone-sa 
-Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 
-Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 
-Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m 
-Dstate.backend=filesystem 
-Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
 
-Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
 
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=file:///opt/flink/log/recovery 
-Ds3.access-key=* -Ds3.secret-key=* 
-Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
 -Dmetrics.reporter.influxdb.scheme=http 
-Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 
-Dmetrics.reporter.influxdb.db=flink_metrics 
-Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 
-Dkubernetes.rest-service.exposed.type=ClusterIP 
-Dkubernetes.config.file=kube_config 
-Dkubernetes.pod-template-file=pod-template.yaml 
local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils   
   [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO

Re: 任务出现TooLongFrameException: Adjusted frame length exceeds问题

2022-07-03 Thread Zhanghao Chen
你好,能提供下完整的 jm/tm 侧报错日志吗?

Best,
Zhanghao Chen

From: 谭家良 
Sent: Sunday, July 3, 2022 16:18
To: user-zh@flink.apache.org 
Subject: 任务出现TooLongFrameException: Adjusted frame length exceeds问题

版本:flink-1.14
任务:FlinkSQL,从kafka消费,TVF统计窗口输出
出现问题:无论是Flink JM和TM都出现了TooLongFrameException: Adjusted frame length exceeds 
10485760: 369295622 - discarded
[cid:4dfb897b$1$181c32399ec$Coremail$tanjl_work$126.com]

我看有部分任务会经常出现这种问题,想请教一下几个问题
1. 出现这种问题的原因是什么?是JM与TM的通信问题还是TM与TM之间的通信问题?
2. 这种情况的出现是否会带来数据丢失?
3. 无论是jm和tm都会出现这种情况,有什么好的解决办法?
<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E8%B0%AD%E5%AE%B6%E8%89%AF&uid=tanjl_work%40126.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsm266e1d2e26af9f759edc476d7a2a0758.jpg&items=%5B%22tanjl_work%40126.com%22%5D>
[https://mail-online.nosdn.127.net/sm266e1d2e26af9f759edc476d7a2a0758.jpg]
谭家良
tanjl_w...@126.com


Re: Flink SQL 能处理延时丢弃的数据吗?

2022-07-14 Thread Zhanghao Chen
你好,可以看下:https://www.mail-archive.com/issues@flink.apache.org/msg498605.html 
的总结。不过这个特性还是实验性的,请谨慎使用。

Best,
Zhanghao Chen

From: Zhizhao Shangguan 
Sent: Friday, July 15, 2022 10:44
To: user-zh@flink.apache.org 
Subject: Flink SQL 能处理延时丢弃的数据吗?

Hi,ALL:



   咨询个问题,对于超过watermark后的数据,还想继续触发窗口操作(类似API的allowedLateness机制),flink 
SQL可以实现吗? 可以话,要如何处理的?



Thanks♪(・ω・)ノ



Re: Re:Re: Flink SQL 能处理延时丢弃的数据吗?

2022-07-16 Thread Zhanghao Chen
Hi, 退订请发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org

Best,
Zhanghao Chen

From: 孙福 
Sent: Saturday, July 16, 2022 23:01
To: user-zh@flink.apache.org 
Subject: Re:Re: Flink SQL 能处理延时丢弃的数据吗?

退订

















在 2022-07-15 15:06:51,"Zhizhao Shangguan"  写道:
>  谢谢 Zhanghao。上午已经测试过了,简单功能是可以,后面会进一步验证这个试验性功能。
>
>在 2022/7/15 下午12:26,“Zhanghao 
>Chen”zhanghao.c...@outlook.com> 写入:
>
>
> 你好,可以看下:https://www.mail-archive.com/issues@flink.apache.org/msg498605.html 
> 的总结。不过这个特性还是实验性的,请谨慎使用。
>
>Best,
>Zhanghao Chen
>
>From: Zhizhao Shangguan 
>Sent: Friday, July 15, 2022 10:44
>To: user-zh@flink.apache.org 
>Subject: Flink SQL 能处理延时丢弃的数据吗?
>
>Hi,ALL:
>
>
>
>   咨询个问题,对于超过watermark后的数据,还想继续触发窗口操作(类似API的allowedLateness机制),flink 
> SQL可以实现吗? 可以话,要如何处理的?
>
>
>
>Thanks♪(・ω・)ノ
>
>


Re: flink异常

2022-07-24 Thread Zhanghao Chen
你好,可以检查下:

  1.  tm 侧是否有异常,导致 tm 退出;
  2.  tm 侧是否 gc 严重导致没有及时处理心跳;
  3.  jm - tm 间是否网络有异常导致心跳信息无法传达。

Best,
Zhanghao Chen

From: 陈卓宇 <2572805...@qq.com.INVALID>
Sent: Friday, July 22, 2022 11:30
To: user-zh 
Subject: flink异常

社区大佬您好,小弟请教一个问题:
flink版本:1.14.5
异常内容如下:
2022-07-22 10:07:51
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
bdp-changlog-mid-relx-middle-promotion-dev-taskmanager-1-1 timed out.
 at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
 at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我该如何解决,如何优化


Re: Flink应用高可靠

2022-07-25 Thread Zhanghao Chen
冷备部署的话可以通过一个外围的作业管控服务定期做 savepoint 并拷贝到另一条链路的 HDFS 集群上,故障时从另一条链路重启作业即可。

Best,
Zhanghao Chen

From: andrew <15021959...@163.com>
Sent: Monday, July 25, 2022 10:05:39 PM
To: user-zh 
Subject: Flink应用高可靠

Dear Flink:
  你好! 
现有一个需求,Flink实时计算平台任务对下游用户很重要,不能出问题。单位准备搭建一套灾备大数据实时集群(kakfa/yarn/hdfs)去部署相同的Flink任务,做应用热备或冷备部署!
 下游业务系统没有做双活热备部署! 疑问是:
   1.  主集群故障,切换灾备集群
  涉及有大量带中间状态的数据实时应用一旦主集群出问题,灾备集群如何同步最新状态的数据进行计算
   2.  主集群若恢复,灾备集群切换后的正常任务如何做数据回迁处理


针对上述需求,社区有没有案例可以提供测试验证!谢谢


Re: 退订

2023-11-25 Thread Zhanghao Chen
你好,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。

Best,
Zhanghao Chen

From: 唐凯 
Sent: Saturday, November 25, 2023 9:23
To: user-zh 
Subject: 退订

退订



 


Re: 退订

2024-02-21 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅来自 
user-zh@flink.apache.org
邮件组的邮件。

Best,
Zhanghao Chen

From: 曹明勤 
Sent: Thursday, February 22, 2024 9:42
To: user-zh@flink.apache.org 
Subject: 退订

退订


Re: 根据flink job web url可以获取到JobGraph信息么?

2024-03-03 Thread Zhanghao Chen
我在 Yanquan 的回答基础上补充下,通过 /jobs/:jobid/plan 实际上拿到的就是 JSON 表示的 JobGraph 信息(通过 
JsonPlanGenerator 这个类生成,包含了绝大部分 jobgraph 里常用的信息),应该能满足你的需要

From: casel.chen 
Sent: Saturday, March 2, 2024 14:17
To: user-zh@flink.apache.org 
Subject: 根据flink job web url可以获取到JobGraph信息么?

正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



 ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
 JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
事实上是可行的。你可以直接修改 StreamExecutionEnvironment 的源码,默认给作业作业注册上一个你们定制的 
listener,然后通过某种那个方式把这个信息透出来。在 FLIP-314 [1] 中,我们计划直接在 Flink 里原生提供一个这样的接口让你去注册自己的 
listener 获取血缘信息,不过还没发布,可以先自己做。

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314:+Support+Customized+Job+Lineage+Listener

From: 阿华田 
Sent: Friday, March 8, 2024 18:47
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 
不满足需求




| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年03月8日 18:23,Zhanghao Chen 写道:
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: flink写kafka时,并行度和分区数的设置问题

2024-03-13 Thread Zhanghao Chen
你好,

写 Kafka 分区的策略取决于使用的 Kafka Sink 的 Partitioner [1],默认使用的是 Kafka 的 Default 
Partitioner,底层使用了一种称之为黏性分区的策略:对于指定 key 的数据按照对 key hash 的方式选择分区写入,对于未指定 key 
的数据则随机选择一个分区,然后“黏住”这个分区一段时间以提升攒批效果,然后攒批结束写完后再随机换一个分区,来在攒批效果和均匀写入间做一个平衡。
具体可以参考 [2]。

因此,默认配置下不存在你说的遍历导致攒批效果下降的问题,在达到 Kafka 
单分区写入瓶颈前,只是扩大写入并发就会有比较好的提升写入吞吐的效果。不过在一些特殊情况下,比如如果你并发很高,单并发写入 QPS 
极低,以致于单次攒批周期内甚至只有一两条消息,导致攒批效果差,打到 Kafka 写入瓶颈,那么降低并发可能反而能通过提升攒批效果的形式,配合写入压缩降低写入 
Kafka 流量,提升写入吞吐。

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#sink-partitioning
[2] https://www.cnblogs.com/huxi2b/p/12540092.html



From: chenyu_opensource 
Sent: Wednesday, March 13, 2024 15:25
To: user-zh@flink.apache.org 
Subject: flink写kafka时,并行度和分区数的设置问题

您好:
 flink将数据写入kafka【kafka为sink】,当kafka 
topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。
 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升?

 是否有相关的源码可以查看。
期待回复,祝好,谢谢!





Re: Re: 1.19自定义数据源

2024-03-28 Thread Zhanghao Chen
如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/

Best,
Zhanghao Chen

From: ha.fen...@aisino.com 
Sent: Thursday, March 28, 2024 15:34
To: user-zh 
Subject: Re: Re: 1.19自定义数据源

我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?

发件人: gongzhongqiang
发送时间: 2024-03-28 15:05
收件人: user-zh
主题: Re: 1.19自定义数据源
你好:

当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
版本考虑,可以将这些SourceFunction用Source重新实现。

ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:

>
> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>


Re: Re:Re: Re: 1.19自定义数据源

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 熊柱 <18428358...@163.com>
Sent: Monday, April 1, 2024 11:14
To: user-zh@flink.apache.org 
Subject: Re:Re: Re: 1.19自定义数据源

退订

















在 2024-03-28 19:56:06,"Zhanghao Chen"  写道:
>如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/
>
>Best,
>Zhanghao Chen
>
>From: ha.fen...@aisino.com 
>Sent: Thursday, March 28, 2024 15:34
>To: user-zh 
>Subject: Re: Re: 1.19自定义数据源
>
>我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?
>
>发件人: gongzhongqiang
>发送时间: 2024-03-28 15:05
>收件人: user-zh
>主题: Re: 1.19自定义数据源
>你好:
>
>当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
>版本考虑,可以将这些SourceFunction用Source重新实现。
>
>ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:
>
>>
>> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>>


Re: 退订

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: zjw 
Sent: Monday, April 1, 2024 11:05
To: user-zh@flink.apache.org 
Subject: 退订




Re: 回复:退订

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 戴少 
Sent: Monday, April 1, 2024 11:10
To: user-zh 
Cc: user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com 
;
 user-zh-subscribe ; user-zh 

Subject: 回复:退订

退订

--

Best Regards,




 回复的原邮件 
| 发件人 | 李一飞 |
| 发送日期 | 2024年03月14日 00:09 |
| 收件人 | 
user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com,
user-zh-subscribe ,
user-zh  |
| 主题 | 退订 |
退订




Re: Re:Flink SQL消费kafka topic有办法限速么?

2024-05-28 Thread Zhanghao Chen
应该是可以的。另外在老版本的 Kafka connector 上,曾经也实现过限速逻辑 [1],可以参考下。这个需求我觉得还比较通用,可以提一个 JIRA。

[1] https://issues.apache.org/jira/browse/FLINK-11501

Best,
Zhanghao Chen

From: casel.chen 
Sent: Tuesday, May 28, 2024 22:00
To: user-zh@flink.apache.org 
Subject: Re:Flink SQL消费kafka topic有办法限速么?

查了下Flink源码,当前DataGeneratorSource有添加RateLimiterStrategy参数,但KafkaSource没有该参数,可以像DataGeneratorSource那样来实现限速么?

public DataGeneratorSource(

GeneratorFunction generatorFunction,

long count,

RateLimiterStrategy rateLimiterStrategy,

TypeInformation typeInfo) {...}

















在 2024-05-27 23:47:40,"casel.chen"  写道:
>Flink SQL消费kafka topic有办法限速么?场景是消费kafka 
>topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?


Re: 这绝对算是bug

2024-06-29 Thread Zhanghao Chen
Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?

Best,
Zhanghao Chen

From: Cuixb 
Sent: Saturday, June 29, 2024 10:31
To: user-zh@flink.apache.org 
Subject: 这绝对算是bug

生产环境Flink 1.16.2

2024-06-29 09:17:23
java.lang.Exception: Job leader for job id 8ccdd299194a686e3ecda602c3c75bf3 
lost leadership.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
at java.util.Optional.ifPresent(Optional.java:159)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For

发自我的 iPhone


Re: Flink Standalone-ZK-HA模式下,CLi任务提交

2024-07-12 Thread Zhanghao Chen
从日志看,ZK 集群滚动的时候发生了切主,两个 JM 都先后成为过 Leader,但是并没有同时是 Leader。

Best,
Zhanghao Chen

From: love_h1...@126.com 
Sent: Friday, July 12, 2024 17:17
To: user-zh@flink.apache.org 
Subject: Flink Standalone-ZK-HA模式下,CLi任务提交

版本:Flink 1.11.6版本,Standalone HA模式,ZooKeeper 3.5.8版本
操作:
 1. 只cancel了所有正在运行的Job,没有Stop Flink集群
 2. 滚动重启Zookeeper集群
 3. 使用 Flink run 命令提交多个Job
现象:
1. 部分Job提交失败,错误信息为 The rpc invocation size 721919700 exceeds the maximum akka 
framesize.
2. Flink 集群有两个JobManager节点的日志中出现了任务接收和执行的信息
疑问:
1. 使用Flink run 命令提交任务会提交到Flink 集群中的两个JobManager节点么
2. 重启Zookeeper集群会导致Flink集群中出现两个Leader角色的JobManager,这是否是一个特殊场景下的BUG