Re: 背压分析

2023-08-16 Thread yidan zhao
1 可控范围即可。
2 分析阶段可以分开,实际运行阶段看情况,怎样性能高就如何搞。
3 看监控,flink web ui有根据每个节点的反压情况按照不同颜色展示。

星海 <2278179...@qq.com.invalid> 于2023年8月16日周三 22:03写道:
>
> hello。大家好,请教几个问题:
> 1、flink中背压存在是合理的吗?还是在可控范围内就行?还是尽可能没有呢?
> 2、如果出现背压,如果多个operator chain 在一起不好分析,需要先将其拆开分析吗?
> 3、分析背压大家有什么工具或者需要注意的指标呢?出现背压就不知道该怎么优化,我们尝试看火焰图、堆栈都无法解决,想问下大家有什么方法论吗?还是借助什么工具去看呢,谢谢!!


Re: 作业full gc 很严重

2023-08-03 Thread yidan zhao
GC日志看GC原因

2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
>
> 大家好,请问下作业跑一段时间就会偶发出现背压,full gc看着很严重,有什么好的工具排查下吗?或者经验文档?谢谢!


Re: Flink窗口状态清除疑问

2023-08-02 Thread yidan zhao
首先你窗口是30min,刚刚开始肯定会是涨的。
其次,后续稳定后,继续涨可能是因为流量在变化。
最后,流量不变情况下,还可能受到延迟的影响。

lxk  于2023年7月25日周二 11:22写道:
>
> 相关配置:
> Flink:1.16
>
> | Checkpointing Mode | Exactly Once |
> | Checkpoint Storage | FileSystemCheckpointStorage |
> | State Backend | EmbeddedRocksDBStateBackend |
> | Interval | 8m 0s |
>
>
> 我有一个程序,主要是用来统计一些热门商品之类的数据
> 具体代码如下:
> .keyBy(data -> data.getShopId() + data.getYh_productid())
> .window(TumblingEventTimeWindows.of(Time.minutes(30)))
> .sum("count").name("clickCount");
>
>
> 按照官方文档的说法,状态在窗口触发后应该就会清除。但是我在webui上以及grafana监控上看到的checkpoint大小一直在增长。
> webui:https://pic.imgdb.cn/item/64bf3efa1ddac507cc6484d5.jpg
> grafana:https://pic.imgdb.cn/item/64bf3fb71ddac507cc65a2c4.jpg
> 想知道下这个具体的原因可能是啥


Re: Flink ML

2023-08-02 Thread yidan zhao
这个取决于你是什么模型,比如python中sklearn的大多模型都可以导出成pmml格式模型,然后java用jpmml库就可以导入进行预测。
如果是tensorflow模型,也有,只不过我忘记了,你可以找找。

15904502343 <15904502...@163.com> 于2023年8月1日周二 16:48写道:
>
> 您好
> 我想知道是否有代码示例,可以在Flink程序中加载预先训练好的编码模型(用python编写)


Re: pyflink1.17 中文乱码

2023-06-07 Thread yidan zhao
可以描述再详细点

1  于2023年6月7日周三 19:55写道:
>
> 老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
>
>
>
>
>


Re: 咨询flink1.17+jdk11环境下使用G1 GC发生大量full gc的问题

2023-06-03 Thread yidan zhao
这个问题比较奇怪,我尝试加大了task.offheap的内存,主要是为了加大 direct memory 的限制,部分作业加大后ok了。
今天又发现一个作业,direct mem 达到9G了还不行(5G
task.offheap额外配置,另外4G是network部分配置的,所以jvm参数部分对direct mem的限制是9G)。

Yuxin Tan  于2023年5月26日周五 18:15写道:
>
> hi, yidan
>
> 除 jvm 参数外,flink 其他配置完全一样吗?比如 state backend 是否有变化?
>
> 另外, jdk11 是不是用的最新版本,不是的话,觉得也可以尝试一下最新版本。
>
> 如果 jdk11 用的最新版本,可以尝试下使用其他 GC 算法是否也有同样问题。比如 -XX:+UseParallelGC
> -XX:NewRatio=3 -XX:ParallelGCThreads=4 -XX:CICompilerCount=4
> -XX:-CompactStrings
>
> Best,
> Yuxin
>
>
> yidan zhao  于2023年5月26日周五 17:39写道:
>
> > 最近升级flink版本和jdk版本,flink从1.15.2升级到1.17.0,jdk从8升级到11。然后出现大量full gc。
> > 分析后,发现主要是 System.gc() 导致。 进一步定位到是 redisson 库中 netty 部分用到了 DirectMemory
> > 导致。 直接内存不足,导致频繁调用 System.gc 触发 full gc。
> > 我现在问题是,通过测试对比实验发现,jdk8+flink1.17没问题,jdk11+flink1.17就会有该问题。
> > 有人知道原因嘛?
> >
> > 其他信息:
> >
> > jdk8和jdk11情况下都是G1GC,且vm参数一致,直接内存max限制也一致。但是通过jinfo等查看,确实jdk8场景下的directMemory使用较少。
> >


Re: 关于Table API 或 SQL 如何设置水印的疑问?

2023-05-30 Thread yidan zhao
你在hive的catalog中定义表的时候就可以定义好event time,以及watermark呀。

ZhaoShuKang  于2023年5月25日周四 08:53写道:
>
> 各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL 
> 设置事件时间有三种方式:
> 1、在 DDL 中定义
> 2、在 DataStream 到 Table 转换时定义
> 3、使用 TableSource 定义
> 而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印?
> 另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中 
> toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream 
> API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。
> 以下是我的第一版代码
> // flink 集成 hive
> System.out.println("初始化Flink环境");
> String hiveVersion = "3.1.2";
> String catalogName = "myhive";
> String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302";
> String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf";
> EnvironmentSettings settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
>
> System.out.println("定义hive环境");
> // 定义 hive catalog 参数:catalog名称、数据库名称、对象名称
> HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, 
> hiveVersion);
> tableEnv.registerCatalog(catalogName, hive);
>
> // 将 HiveCatalog 设置为 session 的当前 catalog
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> // 设置 hive 并行度
> Configuration configuration = tableEnv.getConfig().getConfiguration();
> configuration.setInteger("table.exec.hive.infer-source-parallelism.max", 
> sourceParallelism); // Default 1000
>
> // 使用 HiveTableSource
> System.out.println("定义查询条件");
> // 定义查询条件
> Table table = tableEnv
> .from(catalogName + "." + databaseName + "." + tableName)
> .select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + 
> DAY + "," + HOUR)
> .filter($(YEAR).isEqual(year))
> .filter($(MONTH).isEqual(startMonth))
> .filter($(DAY).isGreaterOrEqual(startDay))
> .filter($(HOUR).isGreaterOrEqual(startHour))
> .filter($(DAY).isLessOrEqual(endDay))
> .filter($(HOUR).isLessOrEqual(endHour));
> tableEnv.createTemporaryView("myTable", table);
>
> // Table 转 Stream,非常耗时
> System.out.println("Table to Stream");
> DataStream resultStream = tableEnv.toDataStream(table);
> // 水印及窗口设置
> System.out.println("水印及窗口");
> resultStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
> .withTimestampAssigner((SerializableTimestampAssigner) 
> (element, recordTimestamp) -> {
> long datetime = 0;
> try {
> datetime = new SimpleDateFormat(DATEFORMAT)
> 
> .parse(element.getFieldAs(DATETIME).toString())
> .getTime();
> } catch (ParseException e) {
> e.printStackTrace();
> }
> return datetime;
> }))
> .windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
> | |
> ZhaoShuKang
> |
> |
> chuckzha...@163.com
> |


Re: flink 输出异常数据

2023-05-30 Thread yidan zhao
这个得靠你自己打日志吧,在可能出NPE的地方 try catch 到,然后打印原始记录。

小昌同学  于2023年5月29日周一 18:30写道:
>
> 你好,数据源是kafka,使用的是stream api
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Weihua Hu |
> | 发送日期 | 2023年5月29日 15:29 |
> | 收件人 |  |
> | 主题 | Re: flink 输出异常数据 |
> Hi,
>
> 你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?
>
> 方便把异常栈贴一下吗
>
> Best,
> Weihua
>
>
> On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:
>
>
> 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: flink jdbcsink 连接数的问题

2023-05-30 Thread yidan zhao
你好,这个问题和flink无关,看你主键实现机制吧,如果是自增,那就是mysql级别自动实现的自增,跟flink搭不上关系的。

小昌同学  于2023年5月31日周三 09:41写道:
>
> 老师,你好,再请教一下,连接数与并行度有关系的话,如果插入数据的MySQL是有主键的话,是不是连接数据也就是并行度只能为1啦呀,如果是多个并行度的话,可能会造成主键冲突;
> 感谢各位老师的指导
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | lxk |
> | 发送日期 | 2023年5月30日 14:30 |
> | 收件人 |  |
> | 主题 | Re:flink jdbcsink 连接数的问题 |
> hi,
> jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
> 关于连接数,则是跟你的并行度有关。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-05-30 13:55:57,"小昌同学"  写道:
> 各位老师,请教一下关于flink jdbcsink 连接数的问题;
> 我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
> 谢谢各位老师的指导
>
> |
> outPutInfoStream.addSink(JdbcSink.sink(
> "REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
> breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
>  values (?,?,?,?,?,?,?,?,?,?)",
> (statement, InPutInfo) -> {
> statement.setString(1,InPutInfo.getBreakCode());
> statement.setString(2,InPutInfo.getBreakName());
> statement.setLong(3,InPutInfo.getBreakDuration());
> statement.setString(4,InPutInfo.getBreakRule());
> statement.setString(5,InPutInfo.getBreakPrimaryKey());
> statement.setString(6, InPutInfo.getBreakStep());
> statement.setString(7, InPutInfo.getBreakStepType());
> statement.setString(8,InPutInfo.getBreakTime());
> statement.setString(9, DateUtil.format(new Date()));
> statement.setString(10, String.valueOf(InPutInfo.getBreakArgs()));
> },
> JdbcExecutionOptions.builder()
> .withBatchSize(10)
> .withBatchIntervalMs(200)
> .withMaxRetries(5)
> .build(),
> new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
> .withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
> .withDriverName("com.mysql.jdbc.Driver")
> .withUsername("111")
> .withPassword("111")
> .build()
> )).name("sink-mysql");
> |
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: flink web ui显示问题

2023-05-30 Thread yidan zhao
没发现你web ui哪里显示了watermark呢?

小昌同学  于2023年5月31日周三 10:22写道:
>
> 你好,老师,感谢回复,我这边将截图放在了腾讯文档中,请查收;
> 感谢各位老师的指导
> 【腾讯文档】flink web ui
> https://docs.qq.com/sheet/DYkZ0Q0prRWJxcER4?tab=BB08J2
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 发送日期 | 2023年5月31日 09:59 |
> | 收件人 |  |
> | 主题 | Re: flink web ui显示问题 |
> Hi,
>
> 好像没有收到附件或者文档,你可以检查确认一下
>
> Best,
> Shammon FY
>
> On Wed, May 31, 2023 at 9:52 AM 小昌同学  wrote:
>
> 各位老师好,请教一个关于flink web ui的显示问题;
> 具体的显示异常截图的我以附件的形式放在文档中,我的疑惑是web
> ui上面已经显示watermark,但是看detail的时候显示不是watermark;
> 感谢各位老师指导
>
> 小昌同学
> ccc0606fight...@163.com
>
> 
>


咨询flink1.17+jdk11环境下使用G1 GC发生大量full gc的问题

2023-05-26 Thread yidan zhao
最近升级flink版本和jdk版本,flink从1.15.2升级到1.17.0,jdk从8升级到11。然后出现大量full gc。
分析后,发现主要是 System.gc() 导致。 进一步定位到是 redisson 库中 netty 部分用到了 DirectMemory
导致。 直接内存不足,导致频繁调用 System.gc 触发 full gc。
我现在问题是,通过测试对比实验发现,jdk8+flink1.17没问题,jdk11+flink1.17就会有该问题。
有人知道原因嘛?

其他信息:
jdk8和jdk11情况下都是G1GC,且vm参数一致,直接内存max限制也一致。但是通过jinfo等查看,确实jdk8场景下的directMemory使用较少。


Re: flink 窗口触发计算的条件

2023-05-24 Thread yidan zhao
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学  于2023年5月25日周四 09:32写道:
>
> 各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
> 我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
> 但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
> 相关代码以及样例数据如下:
> |
> package job;
> import bean.MidInfo3;
> import bean.Result;
> import bean2.BaseInfo2;
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import config.FlinkConfig;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.JoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.ConnectedStreams;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.co.CoMapFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import 
> org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.util.Collector;
> import utils.DateUtil;
> import utils.JdbcUtil;
>
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Properties;
>
> public class RytLogAnly9 {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.disableOperatorChaining();
> //1、消费Kafka中的数据
> String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
> String topicName = FlinkConfig.config.getProperty("dev_topicName");
> String groupId = FlinkConfig.config.getProperty("dev_groupId");
> String devMode = FlinkConfig.config.getProperty("dev_mode");
> Properties prop = new Properties();
> prop.setProperty("bootstrap.servers", servers);
> prop.setProperty("group.id", groupId);
> prop.setProperty("auto.offset.reset", devMode);
> DataStreamSource sourceStream = env.addSource(new 
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
> sourceStream.print("最源端的数据sourceStream");
>
> //2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
> SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
> MapFunction() {
> @Override
> public BaseInfo2 map(String value) throws Exception {
> JSONObject jsonObject = JSON.parseObject(value);
> //获取到不同的服务器IP
> String serverIp = jsonObject.getString("ip");
> //获取到不同的data的数据
> String datas = jsonObject.getString("data");
>
> String[] splits = datas.split("\n");
> HashMap dataMap = new HashMap<>();
> //将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
> String time = splits[0].substring(7, 19).replace("-", "").trim();
> //将subData填充到自定义类型中,用来判断时请求还是应答
> String subData = datas.substring(0, 10);
> for (int i = 0; i < splits.length; i++) {
> if (splits[i].contains("=")) {
> splits[i] = splits[i].replaceFirst("=", "&");
> String[] temp = splits[i].split("&");
> if (temp.length > 1) {
> dataMap.put(temp[0].toLowerCase(), temp[1]);
> }
> }
> }
> return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
> DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
> System.currentTimeMillis());
> }
> 
> }).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
>  recordTimestamp) -> element.getEvenTime()));
> baseInfoStream.print("不加功能描述的 baseInfoStream");
>
> //3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
> SingleOutputStreamOperator completeInfoStream = 
> 

streaming.api.operators和streaming.runtime.operators的区别是啥?

2023-05-05 Thread yidan zhao
如题,想知道这个分类的标准是啥呢?


Re: 不同的流程使用不同的并行度

2023-04-20 Thread yidan zhao
从哪方面考虑,主要根据每个算子的工作复杂性,复杂性越高自然设置越高的并发好点。 其次实际运行时,也可以根据反压情况找到瓶颈进行调整。

Shammon FY  于2023年4月21日周五 09:04写道:
>
> Hi
>
> DataStream作业设置并发度有两种方式
> 1. 在ExecutionEnvironment通过setParallelism设置全局并发
> 2. 在DataStream中通过setParallelism为指定的datastream计算设置并发度
>
> Best,
> Shammon FY
>
> On Fri, Apr 21, 2023 at 8:58 AM 小昌同学  wrote:
>
> >
> >
> > 各位老师好,请教一下关于flink的并行度的问题;
> > 我现在数据上游是kafka(四个分区),经过Flink
> > ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API
> > 还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |


WindowAssigner中windowStagger作用

2023-04-06 Thread yidan zhao
如题,目前看实现,这个 windowStagger 是针对 opeartor 的众多 subtask 之间,针对每个 subtask
生成了一个固定的 offset 作用于该 subtask 处理的元素。因为 staggerOffset 是在 assignWindows
中生成,而且只有第一次会生成,后续复用。如下:

if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(),
size);
}
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start =
TimeWindow.getWindowStartWithOffset(
timestamp, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));


所以,这个功能的目的是,分散不同 subtask
之间的窗口周期offset嘛?那如果实际生产中,是一个TM只有1个slot,整个作业只有1个window类算子的话。这个windowstagger好像没起到分散cpu压力的作用?还是说他的作用在于分散
window 算子整体输出时机而已,并不在乎单个 subtask 的输出时机对嘛?


Re: PartitionNotFoundException

2023-04-02 Thread yidan zhao
设置 taskmanager.network.tcp-connection.enable-reuse-across-jobs 为
false,设置 taskmanager.network.max-num-tcp-connections 大点。
之前有个bug导致这个问题我记得,不知道1.16修复没有。

zhan...@eastcom-sw.com  于2023年4月3日周一 10:08写道:
>
>
> hi, 最近从1.14升级到1.16后,kafka消费不定时会出现 
> [org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
> Partition *** not found.]
> 然后不停自动重启job再继续抛出该异常后 不断重启,直到手动cancel任务后 再启动才恢复正常消费
>
> 在1.14集群中从未出现的问题,升到1.16后才出现,请问是否有配置可以优化或避免该异常?


Re: Re: Kafka 数据源无法实现基于事件时间的窗口聚合

2023-02-08 Thread yidan zhao
实际使用你肯定不会是console producer吧。或者你换java代码写kafka,方便控制些。

wei_yuze  于2023年2月8日周三 13:30写道:
>
> 非常感谢各位的回答!
>
>
>
> Weihua和飞雨正确定位出了问题。问题出在Flink 并发数大于Kafka分区数,导致部分Flink task slot 
> 接收不到数据,进而导致watermark(取所有task slot的最小值)无法推进。
>
>
> 我尝试了Weihua提供的两个解决方案后都可以推进watermark求得窗口聚合结果。
>
>
> 后来我想,理想的解决方式应该是使Flink的并发数接近于或等于Kafka的分区数。我的Kafka分区数为3,于是Flink setParallelism 
> 为3。后来发现又无法推进watermark。检查Kafka后发现,kafka Console Producer把所有的数据都推送到了第0号分区。
>
>
>
> 请问哪位能指点一下,让Kafka topic的每个分区都能收到数据?
>
>
>
>
>
> Best,
>
> Lucas
>
>
>
> Original Email
>
>
>
> Sender:"Weihua Hu"< huweihua@gmail.com ;
>
> Sent Time:2023/2/7 18:48
>
> To:"user-zh"< user-zh@flink.apache.org ;
>
> Subject:Re: Kafka 数据源无法实现基于事件时间的窗口聚合
>
>
> Hi,
>
> 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task
> 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。
> 可以尝试通过以下办法解决:
> 1. 将 source 并发控制为 1
> 2. 为 watermark 策略开始 idleness 处理,参考 [#1]
>
> fromElement 数据源会强制指定并发为 1
>
> [#1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
>
> Best,
> Weihua
>
>
> On Tue, Feb 7, 2023 at 1:31 PM wei_yuze  wrote:
>
>  您好!
> 
> 
> 
> 
>  
> 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource
>  和 kafkaSource
>  。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。
> 
> 
> 
> 
>  public class WindowReduceTest2 {  public static void
>  main(String[] args) throws Exception {
>  StreamExecutionEnvironment env =
>  StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
>  // 使用fromElement数据源
>  DataStreamSource env.fromElements(
>  new
>  Event2("Alice", "./home", "2023-02-04 17:10:11"),
>  new Event2("Bob",
>  "./cart", "2023-02-04 17:10:12"),
>  new
>  Event2("Alice", "./home", "2023-02-04 17:10:13"),
>  new
>  Event2("Alice", "./home", "2023-02-04 17:10:15"),
>  new 
> Event2("Cary",
>  "./home", "2023-02-04 17:10:16"),
>  new 
> Event2("Cary",
>  "./home", "2023-02-04 17:10:16")
>  );
> 
> 
>  // 使用Kafka数据源
>  JsonDeserializationSchema  jsonFormat = new JsonDeserializationSchema<(Event2.class);
>  KafkaSource 
> KafkaSource. 
>  .setBootstrapServers(Config.KAFKA_BROKERS)
> 
>  .setTopics(Config.KAFKA_TOPIC)
> 
>  .setGroupId("my-group")
> 
>  .setStartingOffsets(OffsetsInitializer.earliest())
> 
>  .setValueOnlyDeserializer(jsonFormat)
>  .build();
>  DataStreamSource env.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
>  kafkaSource.print();
> 
> 
>  // 生成watermark,从数据中提取时间作为事件时间
>  SingleOutputStreamOperator  watermarkedStream =
>  
> kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy. 
>  .withTimestampAssigner(new SerializableTimestampAssigner  
> 
>  @Override
>   
>  public long extractTimestamp(Event2 element, long recordTimestamp) {
>   
>SimpleDateFormat simpleDateFormat = new
>  SimpleDateFormat("-MM-dd HH:mm:ss");
>   
>Date date = null;
>   
>try {
>   
>  date =
>  simpleDateFormat.parse(element.getTime());
>   
>} catch (ParseException e) {
>   
>  throw new RuntimeException(e);
>   
>}
>   
>long time = date.getTime();
>   
>System.out.println(time);
>   
>return time;
>}
>  }));
> 
> 
>  // 窗口聚合
>  watermarkedStream.map(new MapFunction  Tuple2 
>  
>  @Override
>   
>  public Tuple2
>   
>// 将数据转换成二元组,方便计算
>   
>return Tuple2.of(value.getUser(), 1L);
>}
>  })
>  .keyBy(r -
>  r.f0)
>  // 设置滚动事件时间窗口
> 
>  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>  .reduce(new
>  ReduceFunction
>  @Override
>   
>  public Tuple2 Tuple2   
>
>// 定义累加规则,窗口闭合时,向下游发送累加结果
>   
>return Tuple2.of(value1.f0, value1.f1 + value2.f1);
>}
>  })
>  
> .print("Aggregated
>  stream");
> 
> 
>  env.execute();
>}
>  }
> 
> 
> 
> 
> 
> 
>  值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows
>  ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。
> 
> 
> 
>  感谢您花时间查看这个问题!
>  Lucas


Re: Unable to list jobs in flink cluster with multiple jobManagers

2023-01-12 Thread yidan zhao
I think it is a bug: https://issues.apache.org/jira/browse/FLINK-25732

Yael Adsl  于2022年12月12日周一 23:56写道:
>
> Hi,
>
> We are running a flink cluster (Flink version 1.14.3) on kubernetes with 
> high-availablity.type: kubernetes. We have 3 jobmanagers. When we send jobs 
> to the flink cluster, we run a "flink list --jobmanager 
> flink-jobmanager:8081" command as part of the process".
>
> At first, we succeeded to run this command from within any of the jobmanager 
> CLIs.
> But after the elected leader is deleted (For whatever reason. For example, 
> server failure), the configmaps with the following format are updated with 
> the new leader IP address:
> flink-cluster-ecb24e88d60bb06917da1c933785811a-jobmanager-leader
> flink-cluster-b4bef19e6481a6c42340e51b69e30923-jobmanager-leader
> ...
>
> But the following configmaps are not always updated with the same IP address 
> as the others:
> flink-cluster-restserver-leader
> flink-cluster-resourcemanager-leader
> flink-cluster-dispatcher-leader
>
> Then, when we run the flink list command, we receive the error attached at 
> the end of this mail.
> If we delete the jobmanager where the flink-cluster-restserver-leader 
> configmap is pointing, the configmap gets modified to the IP address of the 
> other configmaps, and the "flink list" command succeeds.
> Note: I can see in the log that the command attempts to connect to the IP 
> which is set in the configmap: flink-cluster-restserver-leader'
>
> How do we fix this issue without needing any manual intervention?
>
> Thanks,
> Yael
>
> Error from CLI when running flink list command:
>
> root@flink-jobmanager-68b5fb748d-wwmvt:/opt/flink# flink list --jobmanager 
> localhost:8081
> 2022-12-12 15:04:19,037 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Starting to watch for xi-env/flink-cluster-restserver-leader, watching 
> id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e4
> 12c
> Waiting for response...
> 2022-12-12 15:04:21,231 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver 
> [] - Stopping 
> KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-restserver-leader'}.
> 2022-12-12 15:04:21,233 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Stopped to watch for xi-env/flink-cluster-restserver-leader, watching 
> id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e41
> 2c
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Failed to retrieve job list.
> at 
> org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
> server error.,  org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to 
> serialize the result for RPC call : requestMultipleJobDetails.
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)
> at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
> Source)
> at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> Source)
> at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
> at java.base/java.util.concurrent.CompletableFuture.complete(Unknown 
> Source)
> at 
> org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:858)
> at 
> org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:876)
> at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
> Source)
> at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
>  Source)
> at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
> at java.base/java.util.concurrent.CompletableFuture.complete(Unknown 
> Source)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
> at 
> 

Re: 请问cancel的任务能够恢复running状态吗?

2023-01-12 Thread yidan zhao
话说我也有个问题,stop后基于savepoint恢复 不同于先savepoint,然后cancel后基于savepoint恢复?

Weihua Hu  于2023年1月5日周四 10:38写道:
>
> Hi,
>
> 简单来说是不能,已经 cancel 的 job 状态不能恢复到 running 状态。用 savepoint 恢复的任务是新的 job。
>
> 这个问题的背景是什么呢?什么情况下需要将已经 cancel 的 job 恢复呢?
>
>
> Best,
> Weihua
>
>
> On Fri, Dec 30, 2022 at 5:12 PM 陈佳豪  wrote:
>
> > hi
> > 我目前测试flink restapi
> > 指定savepointpath来恢复任务发现会重新触发创建一个新的任务原有的任务还是cancel状态,请问有办法恢复原有cancel状态的任务为running吗?


Re: 任务本地运行正常,提交到集群报错 - 图片挂掉,文字贴一下报错信息,非常抱歉打扰

2023-01-12 Thread yidan zhao
看报错 Could not connect to BlobServer at address
localhost/127.0.0.1:33271,你本地的配置是不是不对。提交到什么模式部署的集群,配置是否配对了。

WD.Z  于2023年1月10日周二 10:56写道:
>
> 任务在webui点击submit时报错,看起来是从JM提交到TM时报错,服务器防火墙已关闭,资源足够,还没有安装hadoop,但以standalone模式启动,看了下文档是不需要hadoop?
>  报错中的Caused by列表如下:
>
>
> 2023-01-10 09:46:14,627 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job e343bc906ea6889d34d9472d40d4f8ff is submitted.
> 2023-01-10 09:46:14,627 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=e343bc906ea6889d34d9472d40d4f8ff.
> 2023-01-10 09:46:14,629 WARN  
> org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
> Could not execute application:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.util.FlinkException: Failed to execute job 
> 'Flink Streaming Job'.
>
>
> Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkException: 
> Failed to execute job 'Flink Streaming Job'.
>
>
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Flink 
> Streaming Job'.
>
>
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Flink 
> Streaming Job'.
>
>
> Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
>
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address 
> localhost/127.0.0.1:33271
>
>
> Caused by: java.net.ConnectException: 拒绝连接 (Connection refused)


Re: 使用flink 动态广播mysql数据报错

2022-12-11 Thread yidan zhao
这没啥问题,但是你代码不能这么写,try catch放 while 内部去。放外边catch到异常不就退出循环了!

bigdata <1194803...@qq.com.invalid> 于2022年12月11日周日 15:12写道:
>
> java.lang.InterruptedException: sleep interrupted
> at java.lang.Thread.sleep(Native Method)
> at 
> com.autoai.wecloud.source.MysqlSceneSource$.run(MysqlSceneSource.scala:49)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
>
>
>
>
> override def run(ctx: 
> SourceFunction.SourceContext[util.HashMap[Integer,String]]): Unit = {
>   try {
> while (isRunning) {
>   val mapResult = new util.HashMap[Integer,String]()
>   val rs = ps.executeQuery()
>   while (rs.next()) {
> mapResult.put(rs.getInt(1), rs.getString(2))
>   }
>   ctx.collect(mapResult)
>   println("mysql条数:"+mapResult.keySet().size())
>   Thread.sleep(1000*5) //每隔30s更新一下用户的配置信息!
> }
>   } catch {
> case e: Exception = log.error(s"查询数据异常, msg = ${e.getMessage}", 
> e.printStackTrace())
>   }
> }
>
> bigdata
> 1194803...@qq.com
>
>
>
> 


Re: 1.15.2作业频繁(每 几十分钟 ~ 1小时)报 LocalTransportException: readAddress(..) failed: Connection timed out .

2022-12-07 Thread yidan zhao
目前感觉和 https://issues.apache.org/jira/browse/FLINK-19249 和
https://issues.apache.org/jira/browse/FLINK-16030
有点类似。网络环境不稳定。相同配置在物理机没问题。

yidan zhao  于2022年12月7日周三 16:11写道:
>
> 谢谢,不过这几个参数和netty关系不大吧。
> heartbeat和akka的可能会和rpc超时有关,不过我这个是netty的报错,不是rpc部分。
> web和rest应该是和client提交任务有关。
>
> Stan1005 <532338...@qq.com.invalid> 于2022年12月7日周三 15:51写道:
> >
> > 我也遇到过,tm的slot数一直是2,并行度高了就很容易出这个报错。tm内存保持为20480mb,相同的job讲并行度降低到256就没有报过这个。
> > 另外可以考虑适当增加这几个参数(具体需要改动哪些建议先搜下这些参数的作用)
> > set rest.connection-timeout=180;
> > set rest.idleness-timeout=180;
> > set heartbeat.timeout=180;
> > set akka.ask.timeout=180;
> > set web.timeout=180;
> >
> >
> >
> > --原始邮件--
> > 发件人:
> > "user-zh"   
> >  
> >  > 发送时间:2022年12月6日(星期二) 晚上7:18
> > 收件人:"user-zh" >
> > 主题:1.15.2作业频繁(每 几十分钟 ~ 1小时)报 LocalTransportException: readAddress(..) 
> > failed: Connection timed out .
> >
> >
> >
> > 如题,这个问题长期存在,我想了解几个点:
> > (1)connection time out
> > 是连接时才会报的错误嘛?作业正常运行期间可能有嘛?我理解是连接时的报错,但是我看部分报错是作业运行不少时间才报错的(比如40分钟,1小时多),这种时刻为什么会有
> > connect 操作呢?netty的connection不是在作业启动时,就发 partition request 的时候创建好的嘛。
> > (2)之前调整过 netty 的 server 的 backlog,目前设置2048,不应该是这个导致。
> > (3)之前我TM都是1个slot,netty的server thread默认就是1,后来设置成2,我考虑是不是因为netty server
> > thread太少导致来不及处理连接?所以出现 connection timeout?但是我加大了server thread
> > 到10还是没啥效果。而且也不至于,理论上netty server thread应该仅负责创建连接,都不负责具体的io,不应该是这个原因。
> >
> > 大佬们,有人知道这个问题出现的场景嘛?就是啥情况会出现,是不是只有创建连接时存在 connection
> > timeout的概念呢?其次flink作业运行期间,除了作业启动后的一小段时间外,什么情况还需要建立 netty 连接呢?
> > 不考虑再提交作业,因为我的TM只有1个slot,而且这个集群只运行1个作业。


Re: 1.15.2作业频繁(每 几十分钟 ~ 1小时)报 LocalTransportException: readAddress(..) failed: Connection timed out .

2022-12-07 Thread yidan zhao
谢谢,不过这几个参数和netty关系不大吧。
heartbeat和akka的可能会和rpc超时有关,不过我这个是netty的报错,不是rpc部分。
web和rest应该是和client提交任务有关。

Stan1005 <532338...@qq.com.invalid> 于2022年12月7日周三 15:51写道:
>
> 我也遇到过,tm的slot数一直是2,并行度高了就很容易出这个报错。tm内存保持为20480mb,相同的job讲并行度降低到256就没有报过这个。
> 另外可以考虑适当增加这几个参数(具体需要改动哪些建议先搜下这些参数的作用)
> set rest.connection-timeout=180;
> set rest.idleness-timeout=180;
> set heartbeat.timeout=180;
> set akka.ask.timeout=180;
> set web.timeout=180;
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2022年12月6日(星期二) 晚上7:18
> 收件人:"user-zh"
> 主题:1.15.2作业频繁(每 几十分钟 ~ 1小时)报 LocalTransportException: readAddress(..) 
> failed: Connection timed out .
>
>
>
> 如题,这个问题长期存在,我想了解几个点:
> (1)connection time out
> 是连接时才会报的错误嘛?作业正常运行期间可能有嘛?我理解是连接时的报错,但是我看部分报错是作业运行不少时间才报错的(比如40分钟,1小时多),这种时刻为什么会有
> connect 操作呢?netty的connection不是在作业启动时,就发 partition request 的时候创建好的嘛。
> (2)之前调整过 netty 的 server 的 backlog,目前设置2048,不应该是这个导致。
> (3)之前我TM都是1个slot,netty的server thread默认就是1,后来设置成2,我考虑是不是因为netty server
> thread太少导致来不及处理连接?所以出现 connection timeout?但是我加大了server thread
> 到10还是没啥效果。而且也不至于,理论上netty server thread应该仅负责创建连接,都不负责具体的io,不应该是这个原因。
>
> 大佬们,有人知道这个问题出现的场景嘛?就是啥情况会出现,是不是只有创建连接时存在 connection
> timeout的概念呢?其次flink作业运行期间,除了作业启动后的一小段时间外,什么情况还需要建立 netty 连接呢?
> 不考虑再提交作业,因为我的TM只有1个slot,而且这个集群只运行1个作业。


1.15.2作业频繁(每 几十分钟 ~ 1小时)报 LocalTransportException: readAddress(..) failed: Connection timed out .

2022-12-06 Thread yidan zhao
如题,这个问题长期存在,我想了解几个点:
(1)connection time out
是连接时才会报的错误嘛?作业正常运行期间可能有嘛?我理解是连接时的报错,但是我看部分报错是作业运行不少时间才报错的(比如40分钟,1小时多),这种时刻为什么会有
connect 操作呢?netty的connection不是在作业启动时,就发 partition request 的时候创建好的嘛。
(2)之前调整过 netty 的 server 的 backlog,目前设置2048,不应该是这个导致。
(3)之前我TM都是1个slot,netty的server thread默认就是1,后来设置成2,我考虑是不是因为netty server
thread太少导致来不及处理连接?所以出现 connection timeout?但是我加大了server thread
到10还是没啥效果。而且也不至于,理论上netty server thread应该仅负责创建连接,都不负责具体的io,不应该是这个原因。

大佬们,有人知道这个问题出现的场景嘛?就是啥情况会出现,是不是只有创建连接时存在 connection
timeout的概念呢?其次flink作业运行期间,除了作业启动后的一小段时间外,什么情况还需要建立 netty 连接呢?
不考虑再提交作业,因为我的TM只有1个slot,而且这个集群只运行1个作业。


Re: Re: flink sql作业无缝升级问题

2022-12-01 Thread yidan zhao
通过savepoint方式先停止作业可以,不停止,你要考虑是否你的作业是否能做到重复处理部分数据不影响准确性。

先做savepoint但不停止作业,新作业启动后,新旧作业是消费的数据是重复的,不会因为相同group就不重复。
因为kafka的消费是2个模式,一个是组模式,还有一个是不受到组约束的。Flink采用的是后者。

我说的那个方法是在kafka后边加一个filter,filter的参数就是start和end,根据start和end过滤数据。
而且这个start和end需要可动态配置,就是不重启作业能配置才行。
这样就可以先启动新作业,并设置其从未来某个时间点(ts)开始消费,并设置旧作业消费到ts后停止。这样等待到ts到达,并确认旧任务消费完成ts之前的数据,停止旧作业就好了。

casel.chen  于2022年12月2日周五 12:42写道:
>
>
>
>
>
>
>
> 拿kafka source作业为例,新老作业使用相同的consumer 
> group,老作业先做savepoint,完了在老作业还在运行的同时启动新作业从刚才的savepoint恢复会有问题么?
> 如何设置一个流量开关让新作业“准备”好再打开流量呢?有没有具体实操的例子?还是说需要自己修改flink源码,具体要看哪一个类方法?
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-30 20:08:44,"Jiangang Liu"  写道:
> >Flink目前无法做到无缝升级,需要走stop-with-savepoint、start
> >job的流程,但是在这之间可以做一些优化来缩短恢复时间。比如,把新作业先启动起来,申请好资源,同时停掉老作业,将做好的savepoint用来触发新作业的执行。
> >
> >casel.chen  于2022年11月29日周二 08:38写道:
> >
> >> 线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb
> >> 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢?
> >> 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka
> >> group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?


Re: flink sql作业无缝升级问题

2022-11-30 Thread yidan zhao
应该是不行,必须先停止。

除非业务层面做了改动,像我业务的话我支持动态配置消费数据的开始结束时间的过滤。这样假设当前作业为A,当前时间9点55分,先动态设置A消费到10点就停止。在10点前启动新作业B,并设置作业B从10点的作业开始消费。这样10点之后比如10点5分左右确认作业A已经消费完10点前数据且sink完就可以停止了。

否则没办法,指定相同group也不可以的应该,我记得flink是使用主动assign分区的方式使用kafka的,因此如果前后作业同时存在,实际是重复消费,不存在共享消费的概念。

刘超  于2022年12月1日周四 09:36写道:
>
> kafka group使用同一个,作业启动从group-offsets开始,如果没有状态的作业是可以的,但是如果存在状态的话,可能不太行。
> 最好的办法就是等流量下来后,再进行一个切换
>
>
>
>
> 刘超
> liuchao1...@foxmail.com
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2022年11月30日(星期三) 晚上8:08
> 收件人:"user-zh"
> 主题:Re: flink sql作业无缝升级问题
>
>
>
> Flink目前无法做到无缝升级,需要走stop-with-savepoint、start
> job的流程,但是在这之间可以做一些优化来缩短恢复时间。比如,把新作业先启动起来,申请好资源,同时停掉老作业,将做好的savepoint用来触发新作业的执行。
>
> casel.chen 
>  线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb
>  数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢?
>  常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka
>  group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?


Re: Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 Thread yidan zhao
好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。

casel.chen  于2022年11月30日周三 00:16写道:
>
> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-29 10:07:40,"yidan zhao"  写道:
> >并不需要从执行计划json生成streamGraph呀~
> >streamGraph提交之前直接转jobGraph。
> >
> >casel.chen  于2022年11月28日周一 08:53写道:
> >>
> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教


Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-28 Thread yidan zhao
并不需要从执行计划json生成streamGraph呀~
streamGraph提交之前直接转jobGraph。

casel.chen  于2022年11月28日周一 08:53写道:
>
> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教


Re: 关于LocalTransportException的优化方向咨询

2022-11-21 Thread yidan zhao
hi,我们继续实验了reuse=false,max-num-tcp-connections继续为1的情况,也解决了问题。因此是reuse的问题。

yidan zhao  于2022年11月21日周一 10:34写道:
>
> 我们这边其实是生产环境一直有类似local/remote transport异常,其中一大类就是 Sending the partition
> request to '...' failed 这种错误。
>
> 当前是 1.15.2,默认 taskmanager.network.max-num-tcp-connections 是 1,
> taskmanager.network.tcp-connection.enable-reuse-across-jobs 是 true。
>
> 当前测试的实际是将taskmanager.network.max-num-tcp-connections
> 改成了30,taskmanager.network.tcp-connection.enable-reuse-across-jobs 改成了
> false,然后目前没出 Sending the partition request to '...' failed 这种错误。之前很频繁。
>
> 我们集群是standalone,每个tm只提供一个slot,单作业大概120个slot左右。
> 之前的表现还有个规律:作业失败重启连续失败,很难自己重启成功。
> 需要人工重启整个集群后,作业自动重启基本就能成功。然后运行个几天后就很容易再出现这个问题。
>
> —— 关于这2个参数,是哪个有效导致,我们后续试着看看。
>
> >
> > Hi yidan,
> >
> > 感谢你的反馈,从之前的描述来看,在修改 taskmanager.network.max-num-tcp-connections 
> > 之前,你们任务可能一天报很多次 LocalTransportException。修改之后,有效果。所以有可能是 
> > taskmanager.network.max-num-tcp-connections 导致的。
> >
> > 但我跟进代码后,发现 taskmanager.network.max-num-tcp-connections 与 
> > taskmanager.network.tcp-connection.enable-reuse-across-jobs 两个参数会一起作用。
> >
> > 可以帮忙验证下:taskmanager.network.max-num-tcp-connections 不变,仍然是 默认值 1 ,但是将 
> > taskmanager.network.tcp-connection.enable-reuse-across-jobs 设置为 false 
> > 吗?看一下这么配置稳定性是否有改善。
> >
> > 通过这样验证,可以方便定位到底是 taskmanager.network.max-num-tcp-connections 还是 
> > taskmanager.network.tcp-connection.enable-reuse-across-jobs 影响了任务稳定性。
> >
> > 非常期待你的反馈,谢谢。
> >
> > fanrui
> >
> > On 2022/11/18 04:25:47 yidan zhao wrote:
> > > Hi, weijie guo.
> > > 你在 https://issues.apache.org/jira/browse/FLINK-28695
> > > 中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任务重启的原因大概分几类,其中一类就是这个问题,目前来看修改参数
> > > taskmanager.network.max-num-tcp-connections 是有效的。
> > >
> > > 但是话说这个重用连接为什么会出现这种问题呢?而且这种问题不是出现1次,而是连续重启很多次后才可能恢复。
> > >
> > > >
> > > > 1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
> > > > sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> > > > 2.指的是作业jar,每个TM只会下载一次
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > yidan zhao  于2022年10月31日周一 19:54写道:
> > > >
> > > > > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > > > > failed;这种异常的概率。
> > > > > 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> > > > > 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
> > > > >
> > > > > weijie guo  于2022年10月31日周一 12:54写道:
> > > > > >
> > > > > > 你好,请问使用的flink版本是多少?
> > > > > > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> > > > > >
> > > > > 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > > > > > restore慢等
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Weijie
> > > > > >
> > > > > >
> > > > > > yidan zhao  于2022年10月30日周日 11:36写道:
> > > > > >
> > > > > > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > > > > > .network.netty.exception.LocalTransportException
> > > > > > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > > > > > (1)Sending the partition request to '...' failed;
> > > > > > > org.apache.flink.runtime.io
> > > > > > > .network.netty.exception.LocalTransportException:
> > > > > > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > > > > > at org.apache.flink.runtime.io
> > > > > > >
> > > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > > > > > at org.apache.flink.runtime.io
> > > > > > >
> > > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > > > > > at
> > > > > > >
> > > > > org.

Re: flink1.15.2无论blob.server.port设置什么端口范围,都会报错

2022-11-21 Thread yidan zhao
日志太少,建议开启debug日志后,发出来完整日志看看。

junjie.m...@goupwith.com  于2022年11月21日周一 19:04写道:
>
> flink1.15.2无论blob.server.port设置什么端口范围,都会报错。
> 如设置blob.server.port=“50100-50200”时报错:
> Caused by: java.io.IOException: Unable to open BLOB Server in specified port 
> range: 50100
> at org.apache.flink.runtime.blob.BlobServer.(BlobServer.java:229)
> at 
> org.apache.flink.runtime.blob.BlobUtils.createBlobServer(BlobUtils.java:129)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:411)
> at 
> org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)
> at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2095)
> at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:804)
> ... 48 common frames omitted
> 不知道是什么情况导致的,且这些端口都是空闲的。


Re: Load Distribution in specific Slot of Taskmanager in flink(version 1.15.2)

2022-11-20 Thread yidan zhao
If no data skew exists, you can set the job's parallelism any times of
the count of taskmanagers, and set `cluster.evenly-spread-out-slots`
to true in flink-conf.yaml of your flink cluster.

harshit.varsh...@iktara.ai  于2022年11月7日周一 20:41写道:
>
> Dear Team,
>
>
>
> I need some advice on setting up load distribution of flink tasks.
>
>
>
> I have a flink task that processes transactions for users. Since load is more 
> than what can be handled on single machine, I want same task to be executed 
> on 3 machines.
>
>
>
> I am trying to use parallelism feature of flink for same. I am able to get 
> flink Job manager to start same task on 3 machines. I want task on each 
> machine to handle 1/3 of total user transaction, kindly suggest what 
> mechanism to use so each task handles only data for their own 1/3 of users.
>
>
>
>
>
> Thanks,
>
> Harshit Varshney


Re: 关于LocalTransportException的优化方向咨询

2022-11-20 Thread yidan zhao
我们这边其实是生产环境一直有类似local/remote transport异常,其中一大类就是 Sending the partition
request to '...' failed 这种错误。

当前是 1.15.2,默认 taskmanager.network.max-num-tcp-connections 是 1,
taskmanager.network.tcp-connection.enable-reuse-across-jobs 是 true。

当前测试的实际是将taskmanager.network.max-num-tcp-connections
改成了30,taskmanager.network.tcp-connection.enable-reuse-across-jobs 改成了
false,然后目前没出 Sending the partition request to '...' failed 这种错误。之前很频繁。

我们集群是standalone,每个tm只提供一个slot,单作业大概120个slot左右。
之前的表现还有个规律:作业失败重启连续失败,很难自己重启成功。
需要人工重启整个集群后,作业自动重启基本就能成功。然后运行个几天后就很容易再出现这个问题。

—— 关于这2个参数,是哪个有效导致,我们后续试着看看。

>
> Hi yidan,
>
> 感谢你的反馈,从之前的描述来看,在修改 taskmanager.network.max-num-tcp-connections 
> 之前,你们任务可能一天报很多次 LocalTransportException。修改之后,有效果。所以有可能是 
> taskmanager.network.max-num-tcp-connections 导致的。
>
> 但我跟进代码后,发现 taskmanager.network.max-num-tcp-connections 与 
> taskmanager.network.tcp-connection.enable-reuse-across-jobs 两个参数会一起作用。
>
> 可以帮忙验证下:taskmanager.network.max-num-tcp-connections 不变,仍然是 默认值 1 ,但是将 
> taskmanager.network.tcp-connection.enable-reuse-across-jobs 设置为 false 
> 吗?看一下这么配置稳定性是否有改善。
>
> 通过这样验证,可以方便定位到底是 taskmanager.network.max-num-tcp-connections 还是 
> taskmanager.network.tcp-connection.enable-reuse-across-jobs 影响了任务稳定性。
>
> 非常期待你的反馈,谢谢。
>
> fanrui
>
> On 2022/11/18 04:25:47 yidan zhao wrote:
> > Hi, weijie guo.
> > 你在 https://issues.apache.org/jira/browse/FLINK-28695
> > 中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任务重启的原因大概分几类,其中一类就是这个问题,目前来看修改参数
> > taskmanager.network.max-num-tcp-connections 是有效的。
> >
> > 但是话说这个重用连接为什么会出现这种问题呢?而且这种问题不是出现1次,而是连续重启很多次后才可能恢复。
> >
> > >
> > > 1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
> > > sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> > > 2.指的是作业jar,每个TM只会下载一次
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > yidan zhao  于2022年10月31日周一 19:54写道:
> > >
> > > > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > > > failed;这种异常的概率。
> > > > 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> > > > 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
> > > >
> > > > weijie guo  于2022年10月31日周一 12:54写道:
> > > > >
> > > > > 你好,请问使用的flink版本是多少?
> > > > > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> > > > >
> > > > 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > > > > restore慢等
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > > > >
> > > > > yidan zhao  于2022年10月30日周日 11:36写道:
> > > > >
> > > > > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > > > > .network.netty.exception.LocalTransportException
> > > > > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > > > > (1)Sending the partition request to '...' failed;
> > > > > > org.apache.flink.runtime.io
> > > > > > .network.netty.exception.LocalTransportException:
> > > > > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > > > > at org.apache.flink.runtime.io
> > > > > >
> > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > > > > at org.apache.flink.runtime.io
> > > > > >
> > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.net

Re: 关于LocalTransportException的优化方向咨询

2022-11-17 Thread yidan zhao
Hi, weijie guo.
你在 https://issues.apache.org/jira/browse/FLINK-28695
中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任务重启的原因大概分几类,其中一类就是这个问题,目前来看修改参数
taskmanager.network.max-num-tcp-connections 是有效的。

但是话说这个重用连接为什么会出现这种问题呢?而且这种问题不是出现1次,而是连续重启很多次后才可能恢复。

>
> 1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
> sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> 2.指的是作业jar,每个TM只会下载一次
>
> Best regards,
>
> Weijie
>
>
> yidan zhao  于2022年10月31日周一 19:54写道:
>
> > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > failed;这种异常的概率。
> > 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> > 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
> >
> > weijie guo  于2022年10月31日周一 12:54写道:
> > >
> > > 你好,请问使用的flink版本是多少?
> > > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> > >
> > 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > > restore慢等
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > yidan zhao  于2022年10月30日周日 11:36写道:
> > >
> > > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > > .network.netty.exception.LocalTransportException
> > > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > > (1)Sending the partition request to '...' failed;
> > > > org.apache.flink.runtime.io
> > > > .network.netty.exception.LocalTransportException:
> > > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > > at org.apache.flink.runtime.io
> > > >
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > > at org.apache.flink.runtime.io
> > > >
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > at java.lang.Thread.run(Thread.java:748)
> > &g

Re: How to get checkpoint stats after job has terminated

2022-11-09 Thread yidan zhao
First of all, you should trigger a savepoint before stopping the job,
and then you can restart the job with the savepoint.

For checkpoints, you need to set
‘execution.checkpointing.externalized-checkpoint-retention’ to
'RETAIN_ON_CANCELLATION'. You can get the checkpoints info via history
server.

Guojun Li  于2022年11月3日周四 11:31写道:
>
> Hi, Flink User Group
>
> I want to retrieve the last few completed checkpoints' stats even if the job 
> has terminated, these stats are useful for restarting the job manually. 
> Because we prefer to restore job from retained checkpoint rather than 
> savepoint. Doc Monitoring Checkpointing said these stats available after the 
> job has terminated.
>
> So I’m wondering
> 1. How to get checkpoint stats after job has terminated?
> 2. How long these stats will retain?
>
> Thanks,
> Guojun
>
>


Re: 关于LocalTransportException的优化方向咨询

2022-10-31 Thread yidan zhao
嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
failed;这种异常的概率。
问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。

weijie guo  于2022年10月31日周一 12:54写道:
>
> 你好,请问使用的flink版本是多少?
> 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> restore慢等
>
> Best regards,
>
> Weijie
>
>
> yidan zhao  于2022年10月30日周日 11:36写道:
>
> > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > .network.netty.exception.LocalTransportException
> > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > (1)Sending the partition request to '...' failed;
> > org.apache.flink.runtime.io
> > .network.netty.exception.LocalTransportException:
> > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > at org.apache.flink.runtime.io
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > at org.apache.flink.runtime.io
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by:
> > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > ChannelPromise)(Unknown Source)
> > Caused by:
> > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > writeAddress(..) failed: Connection timed out
> >
> > (2)readAddress(..) failed: Connection timed out
> > org.apache.flink.runtime.io
> > .network.netty.exception.LocalTransportException:
> > readAddress(..) failed: Connection timed out (connection to
> > '10.35.109.149/10.35.109.149:2094')
> > at org.apache.flink.runtime.io
> > .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaugh

关于busy,idle,backpressure的指标

2022-10-30 Thread yidan zhao
当前我发现部分奇怪现象,比如A=>B。
存在A处于反压,但是B全部都是idle的,busy为0,这种情况是什么原因呢?


关于LocalTransportException的优化方向咨询

2022-10-29 Thread yidan zhao
如题,我生产集群频繁报 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException
异常,具体异常cause有如下情况,按照出现频率从高到底列举。
(1)Sending the partition request to '...' failed;
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
ChannelPromise)(Unknown Source)
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
writeAddress(..) failed: Connection timed out

(2)readAddress(..) failed: Connection timed out
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection timed out (connection to
'10.35.109.149/10.35.109.149:2094')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at 

Re: Tumble Window 会带来反压问题吗?

2022-10-20 Thread yidan zhao
加大内存segment大小?

macia kk  于2022年10月20日周四 15:31写道:
>
> https://img-beg-sg-1252771144.cos.ap-singapore.myqcloud.com/20221020144100.png
> 看这个图,窗口结束的时候,会产生反压,导致前边的 busy 直接是0,不干活了
>
> https://img-beg-sg-1252771144.cos.ap-singapore.myqcloud.com/20221020152835.png
> 这个是前边在正常消费处理的时候
>
>
>
>
> macia kk  于2022年10月20日周四 14:24写道:
>
> > Hi  yidan
> >
> > 我的的意思是,假设上游 1-10 分钟在处理数据,然后第11分钟就把大批量数据发给 sink,然后上游继续进行 10-20的处理,但是这时候
> > sink 由于数据量大产生了阻塞,造成反压反馈给上游,上游就变慢了。但实际上如果没有反压机制。10-20 的时候,sink
> > 其实可以慢慢写完的。唯一的区别是他发送了一个反压信号,导致上游处理变慢。不知道理解的对不对。
> >
> >
> > 为了要10分钟发送,是因为上游太多数据, 所以我先提前用窗口个聚合一下,目前一秒将近有 800MB 的流量
> >
> >
> >
> > Shammon FY  于2022年10月20日周四 11:48写道:
> >
> >> 如果必须要10分钟,但是key比较分散,感觉这种情况可以增加资源加大一下并发试试,减少每个task发出的数据量
> >>
> >> On Thu, Oct 20, 2022 at 9:49 AM yidan zhao  wrote:
> >>
> >> > 这个描述前后矛盾,写出速度跟不上导致反压,那控制写出速度不是问题更大。不过你不需要考虑这些,因为你控制不了写出速度,只能控制写出时机。
> >> >
> >> > 写出时机是由window的结束时间和watermark决定的,所以如果真要解决,需要控制分窗不要固定整点10分钟。
> >> >
> >> > macia kk  于2022年10月20日周四 00:57写道:
> >> > >
> >> > > 聚合10分钟再输出,到10分钟的时候由于积攒了很多数据,写出速度跟不上,导致反压,然后上游消费就处理变慢了。
> >> > >
> >> > > 如果控制一下写出的速度,让他慢慢写会不会好一些
> >> >
> >>
> >


Re: Tumble Window 会带来反压问题吗?

2022-10-19 Thread yidan zhao
这个描述前后矛盾,写出速度跟不上导致反压,那控制写出速度不是问题更大。不过你不需要考虑这些,因为你控制不了写出速度,只能控制写出时机。

写出时机是由window的结束时间和watermark决定的,所以如果真要解决,需要控制分窗不要固定整点10分钟。

macia kk  于2022年10月20日周四 00:57写道:
>
> 聚合10分钟再输出,到10分钟的时候由于积攒了很多数据,写出速度跟不上,导致反压,然后上游消费就处理变慢了。
>
> 如果控制一下写出的速度,让他慢慢写会不会好一些


Re: Re: flink1.15.1 stop 任务失败

2022-10-14 Thread yidan zhao
嗯,我之前也试过了,kafkaSouce确实是可以的,就是FlinkKafkaConsumer不行。

yanfei lei  于2022年10月14日周五 14:22写道:
>
> Hi yidan && hjw,
> 我用FlinkKafkaConsumer在本地也复现了这一问题,但用KafakaSource是可以正常做stop-with-savepoint的。FlinkKafkaConsumer在Flink
> 1.15后被deprecated了[1],推荐用新的KafkaSource再试试。
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sourcefunction
>
> Best,
> Yanfei
>
> hjw <1010445...@qq.com.invalid> 于2022年8月23日周二 23:39写道:
>
> > 我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira
> > https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。
> >
> >
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > hinobl...@gmail.com;
> > 发送时间:2022年8月23日(星期二) 晚上11:09
> > 收件人:"user-zh" >
> > 主题:Re: Re: flink1.15.1 stop 任务失败
> >
> >
> >
> > 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
> > 2 也可以从 cancel 和 stop 的区别上考虑下?
> > 3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。
> >
> > yidan zhao  > 
> >  看了下,报错很少。
> >  反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> >  ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
> >  目前4台机器:
> >  机器1
> >  2022-08-23 22:47:37,093 WARN
> > 
> > org.apache.flink.runtime.taskmanager.Task
> > [] -
> >  Source: JobConfig - Split(JobName_configType)
> >  (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from
> > RUNNING to
> >  FAILED with failure cause:
> >  org.apache.flink.util.FlinkRuntimeException: S
> >  top-with-savepoint failed.
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> >  Executor.java:93)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> >  8)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >  at
> > java.lang.Thread.run(Thread.java:748)
> >  下面就是各种 free task,unregister扒拉的。
> > 
> >  机器2
> >  ...
> >  基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
> > 
> >  Xuyang  >  
> >   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> >  
> >  
> >  
> >  
> >  
> >  
> >  
> >   --
> >  
> >   Best!
> >   Xuyang
> >  
> >  
> >  
> >  
> >  
> >   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> >   在 2022-08-23 20:41:59,"yidan zhao"  >   补充部分信息:
> >   看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> >   2022-08-23 20:33:22,307 INFO
> >  
> > org.apache.flink.runtime.jobmaster.JobMaster
> > [] -
> >   Triggering savepoint for job 8d231de75b8227a1b
> >   715b1aa665caa91.
> >   
> >   2022-08-23 20:33:22,318 INFO
> >  
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > [] -
> >   Triggering checkpoint 5 (type=SavepointType{na
> >   me='Savepoint', postCheckpointAction=NONE,
> > formatType=CANONICAL}) @
> >   1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> >   
> >   2022-08-23 20:33:23,701 INFO
> >  
> > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> >   [] - Cannot create recoverable writer
> >due to Recoverable writers on Hadoop are only supported for
> > HDFS,
&g

Flink 的 Taskmanager 间网络连接数、task之间的result sub partition 数对任务性能的影响。

2022-10-13 Thread yidan zhao
任务假设:
   任务从kafka读取数据,经过若干复杂处理(process、window、join、等等),然后sink到kafka。
   并发最高240(kafka分区数),当前采用全部算子相同并发方式部署。
   算子间存在 hash、forward、rebalance 等分区情况。
   此处假设 A 和 B 算子之间是 rebalance。 C 和 D 算子直接是 hash 分区(无数据倾斜)。ABCD都是240并发。 其他算子暂忽略。

TM连接数:
   Flink 的 taskmanager 之间的共享 tcp 连接。因此虽然A到B、C到D之间都是全连接,但实际增加的是result
sub partition数,不会导致tcp连接不断增。
   我们单个TM只提供1个slot的情况下,每个算子都是240并发,所以tm之间的tcp连接是不是可以大概认为是 240 * 239 ?

task间result sub partition:
   task之间的result sub partition太多会有啥影响呢?主要考虑性能影响。是否可能增大 partition not found 的概率呢?


Re: Flink sql从ck恢复,统计数据波动问题

2022-10-10 Thread yidan zhao
ck“打”完是啥意思。

Congxian Qiu  于2022年10月10日周一 15:11写道:
>
> Hi
> 可以的话也同步下相关的计算逻辑,从 checkpoint 恢复后的统计结果可能会和计算逻辑有关
> Best,
> Congxian
>
>
> Hangxiang Yu  于2022年10月10日周一 14:04写道:
>
> > 是什么值下跌呢?哪个metric吗?
> >
> > On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东  wrote:
> >
> > > Hi:
> > > 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?
> >
> >
> >
> > --
> > Best,
> > Hangxiang.
> >


Re: PartitionNotFoundException

2022-09-28 Thread yidan zhao
嗯,谢谢建议,等再出现问题我试试,现在重启后还好,目前感觉是长时间运行后的集群才会出现。

Lijie Wang  于2022年9月29日周四 10:17写道:
>
> Hi,
>
> 可以尝试增大一下 taskmanager.network.request-backoff.max 的值。默认值是 1,也就是 10 s。
> 上下游可能是并发部署的,所以是有可能下游请求 partition 时,上游还没部署完成,增大 
> taskmanager.network.request-backoff.max 可以增加下游的等待时间和重试次数,减小出现 
> PartitionNotFoundException 的概率。
>
> Best,
> Lijie
>
> yidan zhao  于2022年9月28日周三 17:35写道:
>>
>> 按照flink的设计,存在上游还没部署成功,下游就开始请求 partition 的情况吗? 此外,上游没有部署成功一般会有相关日志不?
>>
>> 我目前重启了集群后OK了,在等段时间,看看还会不会出现。
>>
>> Shammon FY  于2022年9月28日周三 15:45写道:
>> >
>> > Hi
>> >
>> > 计算任务输出PartitionNotFoundException,原因是它向上游TaskManager发送partition 
>> > request请求,上游TaskManager的netty server接收到partition 
>> > request后发现它请求的上游计算任务没有部署成功。
>> > 所以从这个异常错误来看netty连接是通的,你可能需要根据输出PartitionNotFoundException信息的计算任务,查一下它的上游计算任务为什么没有部署成功
>> >
>> > On Tue, Sep 27, 2022 at 10:20 PM yidan zhao  wrote:
>> >>
>> >> 补充:flink1.15.2版本,standalone集群,基于zk的ha。
>> >> 环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。
>> >>
>> >> yidan zhao  于2022年9月27日周二 22:07写道:
>> >> >
>> >> > 此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
>> >> > 我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
>> >> > 会不会剩下400的TM的连接,时间厂了就会出现某种问题?
>> >> >
>> >> > yidan zhao  于2022年9月27日周二 16:21写道:
>> >> > >
>> >> > > 打开了TM的debug日志后发现很多这种日志:
>> >> > > Responding with error: class
>> >> > > org.apache.flink.runtime.io.network.partition.PartitionNotFoundException
>> >> > >
>> >> > > 目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
>> >> > > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> > > Sending the partition request to '/10.216.187.171:8709 (#0)' failed.
>> >> > > at 
>> >> > > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
>> >> > > at 
>> >> > > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> >> > > at java.lang.Thread.run(Thread.java:748)
>> >> > > Caused by: 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
>> >> > > at 
>> >> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
>> >> > > ChannelPromise)(Unknown Source)
>> >> > >
>> >> > > 不清楚和debug的那个日志是否有关呢?
>> >> > >
>> >> > > 然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。


Re: PartitionNotFoundException

2022-09-28 Thread yidan zhao
按照flink的设计,存在上游还没部署成功,下游就开始请求 partition 的情况吗? 此外,上游没有部署成功一般会有相关日志不?

我目前重启了集群后OK了,在等段时间,看看还会不会出现。

Shammon FY  于2022年9月28日周三 15:45写道:
>
> Hi
>
> 计算任务输出PartitionNotFoundException,原因是它向上游TaskManager发送partition 
> request请求,上游TaskManager的netty server接收到partition request后发现它请求的上游计算任务没有部署成功。
> 所以从这个异常错误来看netty连接是通的,你可能需要根据输出PartitionNotFoundException信息的计算任务,查一下它的上游计算任务为什么没有部署成功
>
> On Tue, Sep 27, 2022 at 10:20 PM yidan zhao  wrote:
>>
>> 补充:flink1.15.2版本,standalone集群,基于zk的ha。
>> 环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。
>>
>> yidan zhao  于2022年9月27日周二 22:07写道:
>> >
>> > 此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
>> > 我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
>> > 会不会剩下400的TM的连接,时间厂了就会出现某种问题?
>> >
>> > yidan zhao  于2022年9月27日周二 16:21写道:
>> > >
>> > > 打开了TM的debug日志后发现很多这种日志:
>> > > Responding with error: class
>> > > org.apache.flink.runtime.io.network.partition.PartitionNotFoundException
>> > >
>> > > 目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
>> > > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > > Sending the partition request to '/10.216.187.171:8709 (#0)' failed.
>> > > at 
>> > > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
>> > > at 
>> > > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> > > at java.lang.Thread.run(Thread.java:748)
>> > > Caused by: 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
>> > > at 
>> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
>> > > ChannelPromise)(Unknown Source)
>> > >
>> > > 不清楚和debug的那个日志是否有关呢?
>> > >
>> > > 然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。


Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-27 Thread yidan zhao
其实可以和kafka的pull模型对比下,kafka消费是不断轮训pull。我的认知中flink应该不是吧?
flink应该仅仅是请求 result partition 的时候下游主动去上游请求? 建立之后应该就是类似一条连接不断读取数据?

yanfei lei  于2022年9月22日周四 11:31写道:
>
> Hi,
> Flink社区有一篇关于Credit-based Flow Control的blog post
> 
> ,里面介绍了反压机制的原理和优劣势,希望有帮助。
>
> Shammon FY  于2022年9月21日周三 11:43写道:
>
> > Hi
> > 我个人觉得简单的说flink数据传输是pull模型可能会有歧义,一般来讲大家理解的两个模型的执行流程如下
> > 1. push模型
> > 上下游计算任务将初始化网络连接后,上游计算任务直接通过连接不断向下游"push"数据
> > 2. pull模型
> > 上下游计算任务初始化网络连接后,下游计算任务根据自己的计算进度,轮询向上游发送请求“pull”数据,执行下一轮计算
> >
> > 在flink里,上下游交互流程主要分为几个步骤
> > 1. 上游计算任务所在的TM创建一个Netty Server
> > 2. 下游计算任务启动时通过Netty Client跟上游创建连接
> > 3. 下游计算任务向上游发送一个partition
> > request请求,上游根据request请求创建数据reader,通过reader不断读取数据并通过连接发送数据
> > 4. 上下游计算任务分别有自己的内存池子,用于流控,大概流程如下
> > a) 下游计算任务根据数据消费内存池子情况,不定期向上游计算任务更新授信(credit)
> > b) 上游计算任务根据接收到的credit消息,更新本地管理的授信大小
> > c) 上游计算任务根据本地授信大小不断向下游计算任务发送数据
> >
> > 通过这种方式,在资源足够的情况下,可以保证数据传输是完全流式的,这跟传统的pull模型不同,可能更像是支持授信流控机制的push模型
> >
> > On Wed, Sep 21, 2022 at 9:43 AM yh z  wrote:
> >
> > > 你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
> > > 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
> > > 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task
> > 线程的性能瓶颈将导致整条链路的所有
> > > task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。
> > >
> > > Xuyang  于2022年9月9日周五 20:35写道:
> > >
> > > > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best!
> > > > Xuyang
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2022-09-09 19:04:27,"郑 致远"  写道:
> > > > >各位大佬好
> > > > >请教下,
> > > > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
> > > >
> > >
> >


Re: PartitionNotFoundException

2022-09-27 Thread yidan zhao
补充:flink1.15.2版本,standalone集群,基于zk的ha。
环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。

yidan zhao  于2022年9月27日周二 22:07写道:
>
> 此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
> 我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
> 会不会剩下400的TM的连接,时间厂了就会出现某种问题?
>
> yidan zhao  于2022年9月27日周二 16:21写道:
> >
> > 打开了TM的debug日志后发现很多这种日志:
> > Responding with error: class
> > org.apache.flink.runtime.io.network.partition.PartitionNotFoundException
> >
> > 目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> > Sending the partition request to '/10.216.187.171:8709 (#0)' failed.
> > at 
> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > at 
> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > at 
> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: 
> > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > at 
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > ChannelPromise)(Unknown Source)
> >
> > 不清楚和debug的那个日志是否有关呢?
> >
> > 然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。


Re: PartitionNotFoundException

2022-09-27 Thread yidan zhao
此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
会不会剩下400的TM的连接,时间厂了就会出现某种问题?

yidan zhao  于2022年9月27日周二 16:21写道:
>
> 打开了TM的debug日志后发现很多这种日志:
> Responding with error: class
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException
>
> 目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> Sending the partition request to '/10.216.187.171:8709 (#0)' failed.
> at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> ChannelPromise)(Unknown Source)
>
> 不清楚和debug的那个日志是否有关呢?
>
> 然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。


PartitionNotFoundException

2022-09-27 Thread yidan zhao
打开了TM的debug日志后发现很多这种日志:
Responding with error: class
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException

目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to '/10.216.187.171:8709 (#0)' failed.
at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
ChannelPromise)(Unknown Source)

不清楚和debug的那个日志是否有关呢?

然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。


Re: flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 Thread yidan zhao
之前是如何实现的,通过 kafka 的record key?

casel.chen  于2022年9月26日周一 23:21写道:
>
> flink cdc 
> 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?


Re: flink的消费速率是否可以调整

2022-09-26 Thread yidan zhao
应该不行吧,kafka client本身就没有限速的功能。

Jason_H  于2022年9月26日周一 10:17写道:
>
> Hi,各位大佬:
> 我们在使用flink消费kafka的时候,是否可以在代码中自定义消费速率,来调整源端的消费能力。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


flink web ui 异常问题

2022-09-20 Thread yidan zhao
如题,在工作中经常遇到flink任务各种异常,今天我列了下主要的异常,想请大佬们对不同异常的出现场景根据自身经验说下原因、场景、还有可能的优化解决方案。

(1) org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager
with id {...} is no longer reachable.
(2) org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
(3) 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Lost connection to task manager '10.35.213.153/10.35.213.153:2085'.
This indicates that the remote task manager was lost.
(4) org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection timed out (connection to
'10.35.116.170/10.35.116.170:2031')
(5) 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
writeAddress(..) failed: Connection reset by peer
(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id {...} timed out.


Re: 某作业计算算子处于busy状态

2022-09-19 Thread yidan zhao
那你代码检查下有没有内存泄露呢。

杨扬  于2022年9月19日周一 11:21写道:
>
> 还有一个现象,观察到 
> taskHeap内存占用在逐步升高,作业刚启动的时候占用在10%左右,一周后增加至25%左右,两周后增加至50%左右,上述指的是GC后观察到的内存占用值。两周后计算算子几乎一直100%busy状态,端到端延迟已经达到了10s左右,作业已经不可用需要重启了。
>
>
>
>
> > 在 2022年9月15日,下午8:58,yidan zhao  写道:
> >
> > 本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。
> >
> > yidan zhao  于2022年9月15日周四 20:57写道:
> >>
> >> 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
> >>
> >> 杨扬  于2022年9月15日周四 20:02写道:
> >>>
> >>> 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
> >>>
> >>>
> >>>
> >>>
> >>>> 在 2022年9月15日,下午7:27,yidan zhao  写道:
> >>>>
> >>>> busy那就提升并发度看看效果?
> >>>>
> >>>> 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 
> >>>> 14:51写道:
> >>>> 各位好!
> >>>>  目前有一flink作业,大致分为3个阶段:
> >>>>  读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> 
> >>>> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
> >>>>  
> >>>> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
> >>>>
> >>>>  
> >>>> 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
> >>>>
> >>>>
> >>>>
> >>>> ===
> >>>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
> >>>
> >
> > ===
> > 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>


Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
嗯。去zookeeper中删除jobgraph和running job xx吧啦的几个节点。

Summer  于2022年9月16日周五 16:51写道:

> 开了,但是全被干挂了
>  回复的原邮件 
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 16:05
> 收件人 Summer 
> 抄送人 user-zh@flink.apache.org
> 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> HA模式开启了对嘛。
>
> Summer  于2022年9月16日周五 15:48写道:
>
>> 原因是找到了,${FLINK_HOME}/lib缺少了一个任务依赖Jar包,
>> 那么如果我在不添加这个jar的情况下,由于Flink无法启动,怎么才能取消掉这个任务??
>>
>>
>>
>>  回复的原邮件 
>> 发件人 yidan zhao 
>> 发送日期 2022年9月16日 14:51
>> 收件人 Summer 
>> 抄送人 user-zh@flink.apache.org
>> 
>> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
>> 开启了HA是吧。
>>
>> Summer  于2022年9月16日周五 14:32写道:
>>
>> standlone部署
>>
>>
>>
>>
>>
>>
>>
>>
>>  回复的原邮件 
>> 发件人 yidan zhao 
>> 发送日期 2022年9月16日 14:20
>> 收件人 user-zh 
>> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
>> 什么部署模式。
>>
>> Summer  于2022年9月16日周五 13:57写道:
>>
>>
>>
>> Flink版本:1.13.3
>> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
>> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
>> 请问有什么办法取消这个任务。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
HA模式开启了对嘛。

Summer  于2022年9月16日周五 15:48写道:

> 原因是找到了,${FLINK_HOME}/lib缺少了一个任务依赖Jar包,
> 那么如果我在不添加这个jar的情况下,由于Flink无法启动,怎么才能取消掉这个任务??
>
>
>
>  回复的原邮件 ----
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 14:51
> 收件人 Summer 
> 抄送人 user-zh@flink.apache.org
> 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> 开启了HA是吧。
>
> Summer  于2022年9月16日周五 14:32写道:
>
> standlone部署
>
>
>
>
>
>
>
>
>  回复的原邮件 
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 14:20
> 收件人 user-zh 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> 什么部署模式。
>
> Summer  于2022年9月16日周五 13:57写道:
>
>
>
> Flink版本:1.13.3
> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
> 请问有什么办法取消这个任务。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
开启了HA是吧。

Summer  于2022年9月16日周五 14:32写道:

> standlone部署
>
>
>
>
>
>
>
>
> ---- 回复的原邮件 
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 14:20
> 收件人 user-zh 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> 什么部署模式。
>
> Summer  于2022年9月16日周五 13:57写道:
>
>
>
> Flink版本:1.13.3
> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
> 请问有什么办法取消这个任务。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
什么部署模式。

Summer  于2022年9月16日周五 13:57写道:
>
>
> Flink版本:1.13.3
> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
> 请问有什么办法取消这个任务。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: 某作业计算算子处于busy状态

2022-09-15 Thread yidan zhao
本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。

yidan zhao  于2022年9月15日周四 20:57写道:
>
> 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
>
> 杨扬  于2022年9月15日周四 20:02写道:
> >
> > 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
> >
> >
> >
> >
> > > 在 2022年9月15日,下午7:27,yidan zhao  写道:
> > >
> > > busy那就提升并发度看看效果?
> > >
> > > 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 
> > > 14:51写道:
> > > 各位好!
> > >   目前有一flink作业,大致分为3个阶段:
> > >   读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> 
> > > 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
> > >   
> > > 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
> > >
> > >   
> > > 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
> > >
> > >
> > >
> > > ===
> > > 此邮件已由 Deep Discovery Email Inspector 进行了分析。
> >


Re: 某作业计算算子处于busy状态

2022-09-15 Thread yidan zhao
资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。

杨扬  于2022年9月15日周四 20:02写道:
>
> 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
>
>
>
>
> > 在 2022年9月15日,下午7:27,yidan zhao  写道:
> >
> > busy那就提升并发度看看效果?
> >
> > 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 
> > 14:51写道:
> > 各位好!
> >   目前有一flink作业,大致分为3个阶段:
> >   读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> 
> > 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
> >   
> > 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
> >
> >   
> > 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
> >
> >
> >
> > ===
> > 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>


Re: 某作业计算算子处于busy状态

2022-09-15 Thread yidan zhao
busy那就提升并发度看看效果?

杨扬  于2022年9月15日周四 14:51写道:

> 各位好!
> 目前有一flink作业,大致分为3个阶段:
> 读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)->
> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
>
> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
>
> 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
>
>
>


Re: Re: Key group is not in KeyGroupRange

2022-09-13 Thread yidan zhao
你发的代码就是原因,keySelector对于同一个输入数据,输出结果必须一致,不能不同。
如果需要实现类似效果,可以先使用 flatMap 生成 random key,然后将 key 存储到一个字段,比如就叫做 key,然后
keyBy("key") 这样可以。

junjie.m...@goupwith.com  于2022年9月9日周五 17:59写道:
>
>
> Integer[] rebalanceKeys = createRebalanceKeys(parallelism);
> int rebalanceKeyIndex = new Random().nextInt(parallelism);
> Integer key = rebalanceKeys[rebalanceKeyIndex];
>
>  /**
>  * 构建均衡 KEY 数组
>  *
>  * @param parallelism 并行度
>  * @return
>  */
> public static Integer[] createRebalanceKeys(int parallelism) {
> HashMap> groupRanges = new 
> HashMap<>();
> int maxParallelism = 
> KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
> // 构造多个 key 用于生成足够的 groupRanges
> int maxRandomKey = parallelism * 10;
> for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
> int subtaskIndex = 
> KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, 
> maxParallelism, parallelism);
> LinkedHashSet randomKeys = 
> groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
> randomKeys.add(randomKey);
> }
>
> Integer[] result = new Integer[parallelism];
> for (int i = 0; i < parallelism; i++) {
> LinkedHashSet ranges = groupRanges.get(i);
> if (ranges == null || ranges.isEmpty()) {
> throw new RuntimeException("create rebalance keys error");
> }
> result[i] = ranges.stream().findFirst().get();
> }
> return result;
> }
>
>
> 发件人: junjie.m...@goupwith.com
> 发送时间: 2022-09-09 17:52
> 收件人: user-zh
> 主题: Re: Re: Key group is not in KeyGroupRange
> key selector中使用random.nextInt(parallelism) 有时会报错
>
> From: yue ma
> Date: 2022-09-09 17:41
> To: user-zh
> Subject: Re: Key group is not in KeyGroupRange
> 你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。
> junjie.m...@goupwith.com  于2022年9月9日周五 17:35写道:
> > hi:
> > 本人遇到了这个报错:
> > Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}.
> > Unless you're directly using low level state access APIs, this is most
> > likely caused by non-deterministic shuffle key (hashCode and equals
> > implementation).
> >
> > 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的?
> > 谢谢!!
> >
> >


questions about FLINK-27341

2022-09-03 Thread yidan zhao
Hi, I want to know is there some way to avoid this problem now?
I can not guarantee jobmanager and taskmanager do not run in the same machine.


Re: hello flink

2022-09-02 Thread yidan zhao
hello

yh z  于2022年9月2日周五 18:30写道:
>
> Hello
>
> yh z  于2022年9月2日周五 11:51写道:
>
> > hello flink
> >


Re: Issue with file system implementation

2022-09-01 Thread yidan zhao
Do you place the s3 jar to plugins/s3 dir?

Darius Žalandauskas  于2022年8月31日周三 03:11写道:
>
> Hello apache-flink team,
> For the last week I am really struggling with setting up EMR to store stream 
> output to AWS S3.
> According to the documentation, if running flink with emr, no manual 
> adjustments are required, however even having the plugin placed in plugins 
> folder based on documentation I keep receiving same error which is ""Could 
> not find a file system implementation for scheme 's3'.
>
> I have read all forums and already but could not find any solution.
>
> Looking forward to hearing from you.
>
>
> Pagarbiai/Respectfuly
> | Darius Žalandauskas
>
> | Phone: +37063082356
> | eMail: darius.zalandaus...@gmail.com
>
>


Cannot serialize operator object class org.apache.flink.streaming.api.operators.SourceOperatorFactory

2022-09-01 Thread yidan zhao
如下代码片段:
watermarkStrategy = watermarkStrategy.withTimestampAssigner(
new SerializableTimestampAssigner>() {
@Override
public long extractTimestamp(KafkaMessageWrapper
element, long recordTimestamp) {
try {
return element.getData().getTimestamp();
} catch (Exception e) {
return 86400_000;
}
}
}
);
为什么这样会导致序列化报错呢。换成如下就不报错:

watermarkStrategy = watermarkStrategy.withTimestampAssigner(
(SerializableTimestampAssigner>)
(element, recordTimestamp) -> {
try {
return element.getData().getTimestamp();
} catch (Exception e) {
return 86400_000;
}
}
);


source部分设置watermarkStrategy的时候,导致无法序列化。


咨询下大家访问jira、github慢咋解决的都。

2022-08-26 Thread yidan zhao
题目即问题,主要是jira,打开个页面好几十秒。


Re: 请问 taskmanger.host 和 taskmanager.bind-host 的区别是什么呢?

2022-08-26 Thread yidan zhao
貌似这个问题已经有jira了,https://issues.apache.org/jira/browse/FLINK-27341
这个会解决。看样子得等1.15.2或1.16了。

yidan zhao  于2022年8月26日周五 14:08写道:
>
> 目前我可运行方式是:
> bind-host不配置,默认就是0.0.0.0(注意flink-conf中默认配了localhost,需要注释掉),或者配置为0.0.0.0。
>
> JM和TM机器不重复,就是JM独立部署,这样ok。否则都会出问题。
>
> yidan zhao  于2022年8月26日周五 14:01写道:
> >
> > 如题,这俩地址啥区别呢?
> >
> > 1.15.1版本:从测试效果来看:
> > (1)Taskmanager实际绑定地址取决于 bind-host
> > (2)taskmanager.host 貌似被用于 tm 的resource-id部分使用了。
> > (3)假设我设置 host 为 localhost,bind-host为0.0.0.0。这导致我集群的web ui的taskmanager界面展示为:
> > localhost:33865-c8a37d
> > akka.tcp://flink@localhost:33865/user/rpc/taskmanager_0
> >
> > localhost:43867-3afa06
> > akka.tcp://flink@localhost:43867/user/rpc/taskmanager_0
> >
> > localhost:34113-ba7a6f
> > akka.tcp://flink@localhost:34113/user/rpc/taskmanager_0
> >
> > localhost:43503-a5c147
> > akka.tcp://flink@localhost:43503/user/rpc/taskmanager_0
> >
> > 目测提交任务会有问题。
> >
> > 
> > 请问现在1.15.1版本standalone集群部署要怎么配置呢??多机情况。
> > 目前看 https://issues.apache.org/jira/browse/FLINK-24474 调整后,我测试会有很多问题。
> > 如果只配置 bind-host 不配置 host 可能会好点,从调试情况来看,这种情况对于非 resource-manager leader
> > 机器的 tm 启动都正常,因为在选择地址时基于 loopback 地址连接 rm 失败,使用 hostname strategy
> > 正确选择了地址。


Re: 请问 taskmanger.host 和 taskmanager.bind-host 的区别是什么呢?

2022-08-26 Thread yidan zhao
目前我可运行方式是:
bind-host不配置,默认就是0.0.0.0(注意flink-conf中默认配了localhost,需要注释掉),或者配置为0.0.0.0。

JM和TM机器不重复,就是JM独立部署,这样ok。否则都会出问题。

yidan zhao  于2022年8月26日周五 14:01写道:
>
> 如题,这俩地址啥区别呢?
>
> 1.15.1版本:从测试效果来看:
> (1)Taskmanager实际绑定地址取决于 bind-host
> (2)taskmanager.host 貌似被用于 tm 的resource-id部分使用了。
> (3)假设我设置 host 为 localhost,bind-host为0.0.0.0。这导致我集群的web ui的taskmanager界面展示为:
> localhost:33865-c8a37d
> akka.tcp://flink@localhost:33865/user/rpc/taskmanager_0
>
> localhost:43867-3afa06
> akka.tcp://flink@localhost:43867/user/rpc/taskmanager_0
>
> localhost:34113-ba7a6f
> akka.tcp://flink@localhost:34113/user/rpc/taskmanager_0
>
> localhost:43503-a5c147
> akka.tcp://flink@localhost:43503/user/rpc/taskmanager_0
>
> 目测提交任务会有问题。
>
> 
> 请问现在1.15.1版本standalone集群部署要怎么配置呢??多机情况。
> 目前看 https://issues.apache.org/jira/browse/FLINK-24474 调整后,我测试会有很多问题。
> 如果只配置 bind-host 不配置 host 可能会好点,从调试情况来看,这种情况对于非 resource-manager leader
> 机器的 tm 启动都正常,因为在选择地址时基于 loopback 地址连接 rm 失败,使用 hostname strategy
> 正确选择了地址。


请问 taskmanger.host 和 taskmanager.bind-host 的区别是什么呢?

2022-08-26 Thread yidan zhao
如题,这俩地址啥区别呢?

1.15.1版本:从测试效果来看:
(1)Taskmanager实际绑定地址取决于 bind-host
(2)taskmanager.host 貌似被用于 tm 的resource-id部分使用了。
(3)假设我设置 host 为 localhost,bind-host为0.0.0.0。这导致我集群的web ui的taskmanager界面展示为:
localhost:33865-c8a37d
akka.tcp://flink@localhost:33865/user/rpc/taskmanager_0

localhost:43867-3afa06
akka.tcp://flink@localhost:43867/user/rpc/taskmanager_0

localhost:34113-ba7a6f
akka.tcp://flink@localhost:34113/user/rpc/taskmanager_0

localhost:43503-a5c147
akka.tcp://flink@localhost:43503/user/rpc/taskmanager_0

目测提交任务会有问题。


请问现在1.15.1版本standalone集群部署要怎么配置呢??多机情况。
目前看 https://issues.apache.org/jira/browse/FLINK-24474 调整后,我测试会有很多问题。
如果只配置 bind-host 不配置 host 可能会好点,从调试情况来看,这种情况对于非 resource-manager leader
机器的 tm 启动都正常,因为在选择地址时基于 loopback 地址连接 rm 失败,使用 hostname strategy
正确选择了地址。


Re: HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-25 Thread yidan zhao
知道问题了大概,远程debug下来,发现在获取地址时:
(1)JM leader 机器,先拿 127.0.0.1 连接 resource-manager,直接成功,返回了 127.0.0.1。
(2)非 JM leader 机器,先拿 127.0.0.1 连接 resource-manager 失败,然后走下一个 local
host 可以拿到正确 hostname。


Re: HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-25 Thread yidan zhao
今天远程调试了下,目前发现开启远程调试情况,启动后是ok的?

yidan zhao  于2022年8月26日周五 00:01写道:
>
> 这个问题有人知道吗,目前反复实验确定有问题。
>
> 经过多次测试,目前初步怀疑。 并不是单 JM 就会有问题。多JM也有问题。
>
> 出问题的是JM为leader的机器。 比如ABCD4台机器,如果A的JM是leader,那么A机器启动的TM就是127.0.0.1。
>
>
>
> yidan zhao  于2022年8月24日周三 10:30写道:
> >
> > masters:
> > A:8682
> > workers:
> > A
> > B
> > C
> >
> > 都是内网hostname(相互都可解析),非127.0.0.1。
> >
> > flink版本:1.15.1版本。
> >
> > Weihua Hu  于2022年8月24日周三 10:26写道:
> > >
> > > PartitionNotFoundException 应该是跟描述的有一台 TM ip 是 127.0.0.1 有关,其他 TM 
> > > 节点链接不到这个节点。
> > >
> > > 用的什么版本呢?
> > >
> > > 配置文件是这样的吗?
> > > master 文件中有一个 内网 IP: A
> > > workers 文件中有多个内网 IP: A,B,C
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Tue, Aug 23, 2022 at 7:37 PM yidan zhao  wrote:
> > >
> > > >
> > > > 如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> > > > Partition
> > > > c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
> > > > not found
> > > >
> > > > ——
> > > > 任务本身问题不大,也不是网络问题。 目前发现解决方法:
> > > >
> > > > 换成非单 JM 即可。
> > > >
> > > > 同时也发现一个可能原因,或另一个明显现象:
> > > >
> > > > 从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
> > > > masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。
> > > >
> > > > 
> > > > masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。
> > > >


Re: 请教下flink源码分支和tag的命名

2022-08-25 Thread yidan zhao
hi。想继续问下。我目前看官方的 tag 1.15.1 显示不属于任何分支,所以最终1.15.1下载的发布包对应是不是1.15.1的tag呢?

包括后续1.15.2的修改是基于哪个分支 patch 上去的。

Lijie Wang  于2022年7月21日周四 14:01写道:
>
> Hi,
> 1.15.1 应该是对应 tag release-1.15.1
>
> yidan zhao  于2022年7月21日周四 12:53写道:
>
> > 我目前看了下,有一定规律但也还是不完全懂。
> > 比如我目前有部分公司内部用到的,希望基于1.15.1的release上加的话,我需要基于哪个分支?还是tag做更改呢?
> > 哪个branch、or tag是对应官方download页面提供的下载链接的包中一模一样的源码呢,就是不包含新增开发但未发布代码的版本。
> >


Re: HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-25 Thread yidan zhao
这个问题有人知道吗,目前反复实验确定有问题。

经过多次测试,目前初步怀疑。 并不是单 JM 就会有问题。多JM也有问题。

出问题的是JM为leader的机器。 比如ABCD4台机器,如果A的JM是leader,那么A机器启动的TM就是127.0.0.1。



yidan zhao  于2022年8月24日周三 10:30写道:
>
> masters:
> A:8682
> workers:
> A
> B
> C
>
> 都是内网hostname(相互都可解析),非127.0.0.1。
>
> flink版本:1.15.1版本。
>
> Weihua Hu  于2022年8月24日周三 10:26写道:
> >
> > PartitionNotFoundException 应该是跟描述的有一台 TM ip 是 127.0.0.1 有关,其他 TM 节点链接不到这个节点。
> >
> > 用的什么版本呢?
> >
> > 配置文件是这样的吗?
> > master 文件中有一个 内网 IP: A
> > workers 文件中有多个内网 IP: A,B,C
> >
> > Best,
> > Weihua
> >
> >
> > On Tue, Aug 23, 2022 at 7:37 PM yidan zhao  wrote:
> >
> > >
> > > 如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> > > Partition
> > > c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
> > > not found
> > >
> > > ——
> > > 任务本身问题不大,也不是网络问题。 目前发现解决方法:
> > >
> > > 换成非单 JM 即可。
> > >
> > > 同时也发现一个可能原因,或另一个明显现象:
> > >
> > > 从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
> > > masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。
> > >
> > > 
> > > masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。
> > >


Re: pyflink内存管理

2022-08-24 Thread yidan zhao
感谢。我是standalone集群,配置到 flink-conf.yaml 就可行吧。
https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper
该文章说到的必须通过 tableEnv 配置是因为使用 pyflink-shell ?

我提交是用 flink run 提交的。

yu'an huang  于2022年8月25日周四 09:25写道:
>
> 你好,
> python部分的内存算flink taskmanager 配置的内存,你应该可以用参数
> *'taskmanager.memory.task.off-heap.size*
> 来配置,可以参考这个问题:
> https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper
>
>
>
> On Wed, 24 Aug 2022 at 1:05 PM, yidan zhao  wrote:
>
> > 如题,pyflink场景的任务,内存是如何管理呢。
> >
> > python部分的内存是否算入flink TaskManager配置的内存中呢?
> > 比如python算子通过多进程做各种复杂的运算,这部分内存占用是否算入flink呢?
> >
> >
> >
> > ——
> > 如果不算的话,使用pyflink时,容器内存和flink TaskManager内存配置是不是需要预留空间?
> >


pyflink内存管理

2022-08-23 Thread yidan zhao
如题,pyflink场景的任务,内存是如何管理呢。

python部分的内存是否算入flink TaskManager配置的内存中呢?
比如python算子通过多进程做各种复杂的运算,这部分内存占用是否算入flink呢?



——
如果不算的话,使用pyflink时,容器内存和flink TaskManager内存配置是不是需要预留空间?


Re: HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-23 Thread yidan zhao
masters:
A:8682
workers:
A
B
C

都是内网hostname(相互都可解析),非127.0.0.1。

flink版本:1.15.1版本。

Weihua Hu  于2022年8月24日周三 10:26写道:
>
> PartitionNotFoundException 应该是跟描述的有一台 TM ip 是 127.0.0.1 有关,其他 TM 节点链接不到这个节点。
>
> 用的什么版本呢?
>
> 配置文件是这样的吗?
> master 文件中有一个 内网 IP: A
> workers 文件中有多个内网 IP: A,B,C
>
> Best,
> Weihua
>
>
> On Tue, Aug 23, 2022 at 7:37 PM yidan zhao  wrote:
>
> >
> > 如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> > Partition
> > c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
> > not found
> >
> > ——
> > 任务本身问题不大,也不是网络问题。 目前发现解决方法:
> >
> > 换成非单 JM 即可。
> >
> > 同时也发现一个可能原因,或另一个明显现象:
> >
> > 从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
> > masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。
> >
> > 
> > masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。
> >


Re: Re: flink1.15.1 stop 任务失败

2022-08-23 Thread yidan zhao
1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
2 也可以从 cancel 和 stop 的区别上考虑下?
3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。

yidan zhao  于2022年8月23日周二 23:06写道:
>
> 看了下,报错很少。
> 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
> 目前4台机器:
> 机器1
> 2022-08-23 22:47:37,093 WARN
> org.apache.flink.runtime.taskmanager.Task[] -
> Source: JobConfig -> Split(JobName_configType)
>  (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
> FAILED with failure cause:
> org.apache.flink.util.FlinkRuntimeException: S
> top-with-savepoint failed.
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> Executor.java:93)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> 8)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> 下面就是各种 free task,unregister扒拉的。
>
> 机器2
> ...
> 基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
>
> Xuyang  于2022年8月23日周二 22:25写道:
> >
> > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> > 在 2022-08-23 20:41:59,"yidan zhao"  写道:
> > >补充部分信息:
> > >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> > >2022-08-23 20:33:22,307 INFO
> > >org.apache.flink.runtime.jobmaster.JobMaster [] -
> > >Triggering savepoint for job 8d231de75b8227a1b
> > >715b1aa665caa91.
> > >
> > >2022-08-23 20:33:22,318 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Triggering checkpoint 5 (type=SavepointType{na
> > >me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
> > >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:33:23,701 INFO
> > >org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> > >[] - Cannot create recoverable writer
> > > due to Recoverable writers on Hadoop are only supported for HDFS,
> > >will use the ordinary writer.
> > >
> > >2022-08-23 20:33:23,908 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
> > >(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
> > >
> > >
> > >如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> > >
> > >2022-08-23 20:35:01,834 INFO
> > >org.apache.flink.runtime.jobmaster.JobMaster [] -
> > >Triggering stop-with-savepoint for job
> > >8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:01,842 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
> > >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
> > >for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:02,083 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
> > >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13

Re: Re: flink1.15.1 stop 任务失败

2022-08-23 Thread yidan zhao
看了下,报错很少。
反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
目前4台机器:
机器1
2022-08-23 22:47:37,093 WARN
org.apache.flink.runtime.taskmanager.Task[] -
Source: JobConfig -> Split(JobName_configType)
 (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: S
top-with-savepoint failed.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
Executor.java:93)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
8)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
下面就是各种 free task,unregister扒拉的。

机器2
...
基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。

Xuyang  于2022年8月23日周二 22:25写道:
>
> Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> 在 2022-08-23 20:41:59,"yidan zhao"  写道:
> >补充部分信息:
> >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> >2022-08-23 20:33:22,307 INFO
> >org.apache.flink.runtime.jobmaster.JobMaster [] -
> >Triggering savepoint for job 8d231de75b8227a1b
> >715b1aa665caa91.
> >
> >2022-08-23 20:33:22,318 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Triggering checkpoint 5 (type=SavepointType{na
> >me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
> >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:33:23,701 INFO
> >org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> >[] - Cannot create recoverable writer
> > due to Recoverable writers on Hadoop are only supported for HDFS,
> >will use the ordinary writer.
> >
> >2022-08-23 20:33:23,908 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
> >(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
> >
> >
> >如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> >
> >2022-08-23 20:35:01,834 INFO
> >org.apache.flink.runtime.jobmaster.JobMaster [] -
> >Triggering stop-with-savepoint for job
> >8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:35:01,842 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
> >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
> >for job 8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:35:02,083 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
> >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
> >xxx.xxx.com (dataPort=13156).
> >(此处看起来是被decline了,原因是 task failed?)
> >org.apache.flink.util.SerializedThrowable: Task name with subtask :
> >Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
> >reason: Task has failed.
> >at 
> > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >at 
> > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >at 
> > java.util.concurrent.CompletableFuture.uniHandl

Re: flink1.15.1 stop 任务失败

2022-08-23 Thread yidan zhao
ar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]

yidan zhao  于2022年8月23日周二 20:31写道:
>
> 如题,stop,停止并保存检查点失败。
> 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
>
> stop则不行,报错主要是
> Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
> ...
> Caused by: java.util.concurrent.ExecutionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointEx
> ception: Task has failed.
> ...
> Caused by: org.apache.flink.util.SerializedThrowable:
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has
> failed.
> ...
> Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
> ...
>
> __详细日志:


flink1.15.1 stop 任务失败

2022-08-23 Thread yidan zhao
如题,stop,停止并保存检查点失败。
测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。

stop则不行,报错主要是
Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
...
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointEx
ception: Task has failed.
...
Caused by: org.apache.flink.util.SerializedThrowable:
org.apache.flink.runtime.checkpoint.CheckpointException: Task has
failed.
...
Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
...

__详细日志:


HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-23 Thread yidan zhao
如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
not found

——
任务本身问题不大,也不是网络问题。 目前发现解决方法:

换成非单 JM 即可。

同时也发现一个可能原因,或另一个明显现象:

从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。


masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。


pyflink目前map、flatmap都是process实现,那么process当前如何支持sideoutput呢?

2022-08-03 Thread yidan zhao
1 需求是根据输入流,根据字段判定,拆分并输出为2个流。

2 目前看 pyflink 的 api,貌似不支持 sideoutput。

3 虽然可以基于输入流 A,连续处理2次,即输入流 A 流向算子 B 和算子 C,分别筛选自己需要的数据进行处理。但这样会导致数据重复传输。


Re: Flink内部如何做到消息不丢失?

2022-08-02 Thread yidan zhao
我最近也在对比storm和flink。有没有大佬介绍下,storm这种ack模式的是不是恢复会更快点,目前我感觉storm的架构下,各个节点的fail
over更加独立感觉。
Flink 目前集群中任何一个机器失败都会导致整个任务重启,耗时会长点。
但是从全局资源来说,ckpt的资源占用貌似又比ack模式少。
不知道理解对不对。

tison  于2022年7月30日周六 14:28写道:
>
> 可以看下这两份材料
>
> *
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/checkpointing/
> * https://zhuanlan.zhihu.com/p/102607983
>
> 其实就是 Flink 里 Exactly-once 的实现方式,简单说来就是分布式快照批量提交,上游数据可以回放。
>
> Best,
> tison.
>
>
> m18814122325  于2022年7月30日周六 14:22写道:
>
> >
> > 在Storm中会有ack机制来保证消息是否被下个算子是否被处理,那么请问在Flink框架内部中上游算子通过Netty发送消息到下游时,如何做到消息不会因为网络原因等各种异常情况产生丢失情况?
> >
> > 谢谢


Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 Thread yidan zhao
我是本地直接ide内run。

Weihua Hu  于2022年7月27日周三 22:10写道:
>
> Hi, 你是怎么提交的任务呢?是提交到远端的 session cluster 上吗?有其他的相关日志吗?
>
> Best,
> Weihua
>
>
> On Wed, Jul 27, 2022 at 5:36 PM yidan zhao  wrote:
>
> > 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
> >
> > yidan zhao  于2022年7月27日周三 17:34写道:
> > >
> > > 我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。
> > >
> > > yidan zhao  于2022年7月27日周三 10:40写道:
> > > >
> > > > pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> > > > 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
> > > > kafka-clients-2.8.1.jar 却报:
> > > > py4j.protocol.Py4JError:
> > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer does
> > > > not exist in the JVM
> > > >
> > > > Weihua Hu  于2022年7月26日周二 21:21写道:
> > > > >
> > > > > 最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer
> > > > >
> > > > > Best,
> > > > > Weihua
> > > > >
> > > > >
> > > > > On Tue, Jul 26, 2022 at 5:40 PM yidan zhao 
> > wrote:
> > > > >
> > > > > > 如题,我看注释和文档。
> > > > > > add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
> > > > > >
> >


Re: 接收Http请求与flink如何建立联系

2022-07-27 Thread yidan zhao
使用一个算子实现个轻量级的http服务,然后把收到的数据通过广播流方式供给其他流使用。

Jeff  于2022年7月28日周四 10:00写道:
>
> 是想把动态参数传给正在执行的算子?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-07-28 08:16:40,"张锴"  写道:
> >flink版本1.13.2
> >想通过http请求的方式将参数传给flink,这个怎么实现?


Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 Thread yidan zhao
而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?

yidan zhao  于2022年7月27日周三 17:34写道:
>
> 我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。
>
> yidan zhao  于2022年7月27日周三 10:40写道:
> >
> > pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> > 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
> > kafka-clients-2.8.1.jar 却报:
> > py4j.protocol.Py4JError:
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer does
> > not exist in the JVM
> >
> > Weihua Hu  于2022年7月26日周二 21:21写道:
> > >
> > > 最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Tue, Jul 26, 2022 at 5:40 PM yidan zhao  wrote:
> > >
> > > > 如题,我看注释和文档。
> > > > add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
> > > >


Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-27 Thread yidan zhao
我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。

yidan zhao  于2022年7月27日周三 10:40写道:
>
> pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
> kafka-clients-2.8.1.jar 却报:
> py4j.protocol.Py4JError:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer does
> not exist in the JVM
>
> Weihua Hu  于2022年7月26日周二 21:21写道:
> >
> > 最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer
> >
> > Best,
> > Weihua
> >
> >
> > On Tue, Jul 26, 2022 at 5:40 PM yidan zhao  wrote:
> >
> > > 如题,我看注释和文档。
> > > add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
> > >


Re: 关于 storm 转 flink 的一些咨询

2022-07-27 Thread yidan zhao
(1)
现在pyflink支持如何呀,我看了1.15.0的代码中,examples中没有window 的demo,看源码来看貌似没有直接可用的
windowAssigner,都需要自定义?
看 master 貌似才有,这部分目前的可用性如何,感觉是不是还不够完善。

其次,目前看async io相关的也不支持。join也不支持对吗。
对于当前使用pyflink情况,部分不支持的,但是java部分支持的api,如何使用呢?

(2)
其次,我想知道,是否存在反过来的用法。比如主体程序是java,但是部分算子用python实现,注意我说的都是 dataStream api,非batch。
比如基于python实现一个flatMap,针对每个元素跑模型预测。主体则使用java,有什么实现方法吗?

yidan zhao  于2022年7月13日周三 21:54写道:
>
> 谢谢回答。
>
> Xingbo Huang  于2022年7月13日周三 16:55写道:
> >
> > Hi,
> >
> > 简单来说,如果你的作业逻辑中只使用了纯java的算子,比如你写的是一个没有使用 Python udf 的sql/table api
> > 作业时,那么运行时就不需要对Python有需求,但是如果你使用了python udf,或者是你用的是python datastream
> > api来编写你的作业,那么运行时就对python环境有要求,毕竟那些python的自定义函数逻辑是需要Python解释器来执行的。
> >
> > pyflink的runtime有两种执行模式process和thread。process模式是基于apache beam
> > portability框架做的进程间通信的方式,让python自定义函数运行在专门的Python进程的方式。关于thread模式则是基于pemja[1]做的嵌入的方式,让python直接嵌入到jvm里面运行,这种方式是1.15引入的,关于这部分内容可以参考文档[2]
> >
> > 关于性能问题的话,首先,如果你没有用python自定义函数,性能和java一模一样,因为你本质上只是用了pyflink的api。如果你用了python自定义函数,那就取决于你的性能瓶颈在哪了,因为我们知道python函数的性能是不如Java函数的。关于框架层的开销,我之前有写了专门的文章[3]分析过。
> >
> > 希望对你有所帮助。
> >
> > Best,
> > Xingbo
> >
> > [1] https://github.com/alibaba/pemja
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/python_execution_mode/
> > [3] https://flink.apache.org/2022/05/06/pyflink-1.15-thread-mode.html
> >
> > yidan zhao  于2022年7月13日周三 15:47写道:
> >
> > > 目前看了下pyflink,想了解下,pyflink的任务实际运行时也是JAVA+python双环境吗。
> > > 涉及java和python交互等是吗。性能相比java直接开发的任务会有区别吗?
> > >
> > > yidan zhao  于2022年7月12日周二 19:27写道:
> > > >
> > > > 公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。
> > > >
> > > > 初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
> > > > 实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实现和 父进程
> > > > 的通信(基于stdin和stdout貌似)。
> > > >
> > > > 想问问,这种如何改造呢?
> > > > 首先是大方向上,(1)连同python、shell部分一起改造。(2)保留python、shell部分,基于flink实现一套类似机制。
> > > >
> > > > (1)和(2)目前看起来都会很复杂。
> > > >
> > > > 有没有小伙伴做过类似事情呢?
> > >


Re: pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-26 Thread yidan zhao
pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
kafka-clients-2.8.1.jar 却报:
py4j.protocol.Py4JError:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer does
not exist in the JVM

Weihua Hu  于2022年7月26日周二 21:21写道:
>
> 最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer
>
> Best,
> Weihua
>
>
> On Tue, Jul 26, 2022 at 5:40 PM yidan zhao  wrote:
>
> > 如题,我看注释和文档。
> > add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
> >


pyflink 和 add_jars 的 add_classpaths 路径。

2022-07-26 Thread yidan zhao
如题,我看注释和文档。
add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?


为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。

2022-07-21 Thread yidan zhao
~


请教下flink源码分支和tag的命名

2022-07-20 Thread yidan zhao
我目前看了下,有一定规律但也还是不完全懂。
比如我目前有部分公司内部用到的,希望基于1.15.1的release上加的话,我需要基于哪个分支?还是tag做更改呢?
哪个branch、or tag是对应官方download页面提供的下载链接的包中一模一样的源码呢,就是不包含新增开发但未发布代码的版本。


1.15.1 web ui 控制台报错

2022-07-17 Thread yidan zhao
如图,1.15.1启动后,提交examples中任务,web ui 查看checkpoint,不显示内容。
console报错:

main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of
null (reading 'checkpointed_size')
at q (253.e9e8f2b56b4981f5.js:1:607974)
at Sl (main.a7e97c2f60a2616e.js:1:186068)
at Br (main.a7e97c2f60a2616e.js:1:184696)
at N8 (main.a7e97c2f60a2616e.js:1:185128)
at Br (main.a7e97c2f60a2616e.js:1:185153)
at N8 (main.a7e97c2f60a2616e.js:1:185128)
at Br (main.a7e97c2f60a2616e.js:1:185153)
at N8 (main.a7e97c2f60a2616e.js:1:185128)
at Br (main.a7e97c2f60a2616e.js:1:185153)
at B8 (main.a7e97c2f60a2616e.js:1:191872)

已提交到jira。


Re: standalone mode support in the kubernetes operator (FLIP-25)

2022-07-14 Thread yidan zhao
Hi all, Does 'standalone mode support in the kubernetes operator'
means: Using flink-k8s-operator to manage jobs deployed in a
standalone cluster?
What is the advantag doing so.

Yang Wang  于2022年7月14日周四 10:55写道:
>
> I think the standalone mode support is expected to be done in the version 
> 1.2.0[1], which will be released on Oct 1 (ETA).
>
> [1]. 
> https://cwiki.apache.org/confluence/display/FLINK/Release+Schedule+and+Planning
>
>
> Best,
> Yang
>
> Javier Vegas  于2022年7月14日周四 06:25写道:
>>
>> Hello! The operator docs 
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/
>>  say "The Operator does not support Standalone Kubernetes deployments yet" 
>> and mentions 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
>>  as a "what's next" step. Is there a timeline for that to be released?
>>
>> Thanks,
>>
>> Javier Vegas


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-14 Thread yidan zhao
再咨询下关于 flink-k8s-operator 的问题。
我看了看问的文档,提供了2个CRD,分别为 FlinkDeployment 和 FlinkSessionJob。不知道如下理解对不对:
(1)对于 application-mode 方式提交运行的任务,则用 FlinkDeployment,并配置好 job 部分。 会自动创建
flink 集群,并根据 job 配置运行job。
 这种方式不需要考虑集群创建、任务提交的步骤,本身就是一体。
(2)对于 session 集群的创建,也是用 FlinkDeployment ,只是不需要指定 job 配置即可。
(3)配合通过(2)方式创建的 session 集群,则可以配合 FlinkSessionJob 提交任务。

Yang Wang  于2022年7月12日周二 17:10写道:
>
> 如果你K8s集群内的机器配置的DNS Server也是coredns,那就可以正常解析clusterIP对应的service的
>
> 最初ClusterIP的设计也是让任务管理的Pod来使用,例如flink-kubernetes-operator[1]
>
> [1]. https://github.com/apache/flink-kubernetes-operator
>
> Best,
> Yang
>
> yidan zhao  于2022年7月12日周二 13:17写道:
>
> > 我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
> > 那么使用 --target kubernetes-session
> > -Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
> > cluster 的 svc 的 clusterIp 去提交呢。
> >
> > yidan zhao  于2022年7月12日周二 12:50写道:
> > >
> > > 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
> > >
> > >
> > > 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
> > >
> > > yidan zhao  于2022年7月12日周二 12:48写道:
> > > >
> > > > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> > > >
> > > > Yang Wang  于2022年7月12日周二 12:07写道:
> > > > >
> > > > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > > > 否则你就需要NodePort或者LoadBalancer的方式了
> > > > >
> > > > > 2022-07-12 10:23:23,021 WARN
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > > yidan zhao  于2022年7月12日周二 10:40写道:
> > > > >
> > > > > > 如下步骤参考的文档
> > > > > >
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > >
> > > > > > 版本:1.15
> > > > > >
> > > > > > (1)创建集群:
> > > > > >
> > > > > >
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > > (2)提交任务:
> > > > > > ./bin/flink run \
> > > > > > --target kubernetes-session \
> > > > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > > > ./examples/streaming/TopSpeedWindowing.jar
> > > > > >
> > > > > > svc是ClusterIp类型
> > > > > >
> > > > > > 第二步提交任务环节,显示如下:
> > > > > > Executing example with default input data.
> > > > > > Use --input to specify file input.
> > > > > > Printing result to stdout. Use --output to specify output path.
> > > > > > 2022-07-12 10:23:23,021 WARN
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> > since
> > > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > > 2022-07-12 10:23:23,027 INFO
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Retrieve flink cluster my-first-flink-cluster successfully,
> > JobManager
> > > > > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > > > > 2022-07-12 10:23:23,044 WARN
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> > since
> > > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > >
> > > > > > 
> > > > > >  The program finished with the following exception:
> > 

Re: 关于 storm 转 flink 的一些咨询

2022-07-13 Thread yidan zhao
谢谢回答。

Xingbo Huang  于2022年7月13日周三 16:55写道:
>
> Hi,
>
> 简单来说,如果你的作业逻辑中只使用了纯java的算子,比如你写的是一个没有使用 Python udf 的sql/table api
> 作业时,那么运行时就不需要对Python有需求,但是如果你使用了python udf,或者是你用的是python datastream
> api来编写你的作业,那么运行时就对python环境有要求,毕竟那些python的自定义函数逻辑是需要Python解释器来执行的。
>
> pyflink的runtime有两种执行模式process和thread。process模式是基于apache beam
> portability框架做的进程间通信的方式,让python自定义函数运行在专门的Python进程的方式。关于thread模式则是基于pemja[1]做的嵌入的方式,让python直接嵌入到jvm里面运行,这种方式是1.15引入的,关于这部分内容可以参考文档[2]
>
> 关于性能问题的话,首先,如果你没有用python自定义函数,性能和java一模一样,因为你本质上只是用了pyflink的api。如果你用了python自定义函数,那就取决于你的性能瓶颈在哪了,因为我们知道python函数的性能是不如Java函数的。关于框架层的开销,我之前有写了专门的文章[3]分析过。
>
> 希望对你有所帮助。
>
> Best,
> Xingbo
>
> [1] https://github.com/alibaba/pemja
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/python_execution_mode/
> [3] https://flink.apache.org/2022/05/06/pyflink-1.15-thread-mode.html
>
> yidan zhao  于2022年7月13日周三 15:47写道:
>
> > 目前看了下pyflink,想了解下,pyflink的任务实际运行时也是JAVA+python双环境吗。
> > 涉及java和python交互等是吗。性能相比java直接开发的任务会有区别吗?
> >
> > yidan zhao  于2022年7月12日周二 19:27写道:
> > >
> > > 公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。
> > >
> > > 初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
> > > 实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实现和 父进程
> > > 的通信(基于stdin和stdout貌似)。
> > >
> > > 想问问,这种如何改造呢?
> > > 首先是大方向上,(1)连同python、shell部分一起改造。(2)保留python、shell部分,基于flink实现一套类似机制。
> > >
> > > (1)和(2)目前看起来都会很复杂。
> > >
> > > 有没有小伙伴做过类似事情呢?
> >


Re: 关于 storm 转 flink 的一些咨询

2022-07-13 Thread yidan zhao
目前看了下pyflink,想了解下,pyflink的任务实际运行时也是JAVA+python双环境吗。
涉及java和python交互等是吗。性能相比java直接开发的任务会有区别吗?

yidan zhao  于2022年7月12日周二 19:27写道:
>
> 公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。
>
> 初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
> 实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实现和 父进程
> 的通信(基于stdin和stdout貌似)。
>
> 想问问,这种如何改造呢?
> 首先是大方向上,(1)连同python、shell部分一起改造。(2)保留python、shell部分,基于flink实现一套类似机制。
>
> (1)和(2)目前看起来都会很复杂。
>
> 有没有小伙伴做过类似事情呢?


关于 storm 转 flink 的一些咨询

2022-07-12 Thread yidan zhao
公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。

初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实现和 父进程
的通信(基于stdin和stdout貌似)。

想问问,这种如何改造呢?
首先是大方向上,(1)连同python、shell部分一起改造。(2)保留python、shell部分,基于flink实现一套类似机制。

(1)和(2)目前看起来都会很复杂。

有没有小伙伴做过类似事情呢?


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread yidan zhao
我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
那么使用 --target kubernetes-session
-Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
cluster 的 svc 的 clusterIp 去提交呢。

yidan zhao  于2022年7月12日周二 12:50写道:
>
> 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
>
>
> 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
>
> yidan zhao  于2022年7月12日周二 12:48写道:
> >
> > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> >
> > Yang Wang  于2022年7月12日周二 12:07写道:
> > >
> > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > 否则你就需要NodePort或者LoadBalancer的方式了
> > >
> > > 2022-07-12 10:23:23,021 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > yidan zhao  于2022年7月12日周二 10:40写道:
> > >
> > > > 如下步骤参考的文档
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 版本:1.15
> > > >
> > > > (1)创建集群:
> > > >
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > (2)提交任务:
> > > > ./bin/flink run \
> > > > --target kubernetes-session \
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > ./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > svc是ClusterIp类型
> > > >
> > > > 第二步提交任务环节,显示如下:
> > > > Executing example with default input data.
> > > > Use --input to specify file input.
> > > > Printing result to stdout. Use --output to specify output path.
> > > > 2022-07-12 10:23:23,021 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > 2022-07-12 10:23:23,027 INFO
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > > 2022-07-12 10:23:23,044 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > >
> > > > 
> > > >  The program finished with the following exception:
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > > method caused an error: Failed to execute job
> > > > 'CarTopSpeedWindowingExample'.
> > > > ...
> > > > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > > > 'CarTopSpeedWindowingExample'.
> > > > ...
> > > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > > Failed to submit JobGraph.
> > > > ...
> > > > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > > Could not complete the operation. Number of retries has been
> > > > exhausted.
> > > > ...
> > > > Caused by: java.util.concurrent.CompletionException:
> > > > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > > > or service not known
> > > > ...
> > > > Caused by: java.net.UnknownHostException:
> > > > my-first-flink-cluster-rest.test: Name or service not known
> > > >
> > > >
> > > > 如上,根据 --target kubernetes-session
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > > >
> > > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > > >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread yidan zhao
如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?


其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。

yidan zhao  于2022年7月12日周二 12:48写道:
>
> 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
>
> Yang Wang  于2022年7月12日周二 12:07写道:
> >
> > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > 否则你就需要NodePort或者LoadBalancer的方式了
> >
> > 2022-07-12 10:23:23,021 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> >
> >
> > Best,
> > Yang
> >
> > yidan zhao  于2022年7月12日周二 10:40写道:
> >
> > > 如下步骤参考的文档
> > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > >
> > > 版本:1.15
> > >
> > > (1)创建集群:
> > >
> > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > (2)提交任务:
> > > ./bin/flink run \
> > > --target kubernetes-session \
> > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > ./examples/streaming/TopSpeedWindowing.jar
> > >
> > > svc是ClusterIp类型
> > >
> > > 第二步提交任务环节,显示如下:
> > > Executing example with default input data.
> > > Use --input to specify file input.
> > > Printing result to stdout. Use --output to specify output path.
> > > 2022-07-12 10:23:23,021 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > 2022-07-12 10:23:23,027 INFO
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > 2022-07-12 10:23:23,044 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > >
> > > 
> > >  The program finished with the following exception:
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method caused an error: Failed to execute job
> > > 'CarTopSpeedWindowingExample'.
> > > ...
> > > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > > 'CarTopSpeedWindowingExample'.
> > > ...
> > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > Failed to submit JobGraph.
> > > ...
> > > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > Could not complete the operation. Number of retries has been
> > > exhausted.
> > > ...
> > > Caused by: java.util.concurrent.CompletionException:
> > > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > > or service not known
> > > ...
> > > Caused by: java.net.UnknownHostException:
> > > my-first-flink-cluster-rest.test: Name or service not known
> > >
> > >
> > > 如上,根据 --target kubernetes-session
> > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > >
> > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread yidan zhao
我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。

Yang Wang  于2022年7月12日周二 12:07写道:
>
> 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> 否则你就需要NodePort或者LoadBalancer的方式了
>
> 2022-07-12 10:23:23,021 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
>
>
> Best,
> Yang
>
> yidan zhao  于2022年7月12日周二 10:40写道:
>
> > 如下步骤参考的文档
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> >
> > 版本:1.15
> >
> > (1)创建集群:
> >
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > (2)提交任务:
> > ./bin/flink run \
> > --target kubernetes-session \
> > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > ./examples/streaming/TopSpeedWindowing.jar
> >
> > svc是ClusterIp类型
> >
> > 第二步提交任务环节,显示如下:
> > Executing example with default input data.
> > Use --input to specify file input.
> > Printing result to stdout. Use --output to specify output path.
> > 2022-07-12 10:23:23,021 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > 2022-07-12 10:23:23,027 INFO
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > 2022-07-12 10:23:23,044 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> >
> > 
> >  The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: Failed to execute job
> > 'CarTopSpeedWindowingExample'.
> > ...
> > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > 'CarTopSpeedWindowingExample'.
> > ...
> > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > Failed to submit JobGraph.
> > ...
> > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > Could not complete the operation. Number of retries has been
> > exhausted.
> > ...
> > Caused by: java.util.concurrent.CompletionException:
> > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > or service not known
> > ...
> > Caused by: java.net.UnknownHostException:
> > my-first-flink-cluster-rest.test: Name or service not known
> >
> >
> > 如上,根据 --target kubernetes-session
> > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> >
> > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> >


  1   2   3   4   >