Re: (无主题)
嗨,你使用的是session还是perJob的作业?哪个flink版本?有详细的日志吗?一般不退户,可能是master卡在了哪里,比如我们遇到过卡在handler或者异步执行有问题。 田磊 于2021年6月20日周日 上午11:14写道: > > 我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。 > > > | | > totorobabyfans > | > | > 邮箱:totorobabyf...@163.com > | > > 签名由 网易邮箱大师 定制
Re:Re: (无主题)
用的是perjob的模式,版本是1.12.1,因为我读的是hbase的数据,用的流的模式,是不是数据读完之后任务不会结束啊。下面是我的提交命令:./flink-1.12.1/bin/flink run -t yarn-per-job -m yarn-cluster -ynm historyPhotosConstientHandle -ys 8 -yjm 4096 -ytm 10240 -c com.chinaunicom.audit.photo.history.handle.AuditPhotoHistoryNumberHandleApp /mnt/sd02/tglf005/devsoft/data-audit-photo-history-handle-2.1.jar,我发现如果指定了perjob参数的话,那么你在命令中指定的slot数量不起作用,我代码的最大并行度为16,按理来说应该分配2个taskmanager,但是却是分配了16个,每个taskmanager1个slot。如果不指定 -t yarn-per-job的话,那么提交后就显示的是2个taskmanager,不是很明白,小白一个,望大家指点一二。 在 2021-06-21 18:26:33,"刘建刚" 写道: >嗨,你使用的是session还是perJob的作业?哪个flink版本?有详细的日志吗?一般不退户,可能是master卡在了哪里,比如我们遇到过卡在handler或者异步执行有问题。 > >田磊 于2021年6月20日周日 上午11:14写道: > >> >> 我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。 >> >> >> | | >> totorobabyfans >> | >> | >> 邮箱:totorobabyf...@163.com >> | >> >> 签名由 网易邮箱大师 定制
Re:?????? flink sql??????????????????
k8s ?? ?? 2021-06-16 18:22:29??"??" <809097...@qq.com> ?? >FlinkSql WebIDE?? >FlinkSQLSQL??SqlCli?? > https://github.com/DataLinkDC/dlink > > > > >-- -- >??: "todd": 2021??6??16??(??) 5:48 >??: "user-zh": Re: flink sql?? > > > >Flink >apihttps://github.com/todd5167/flink-spark-submiter?? > >spi > >?? > - ??Flink >client >?? > - ?? > - ?? > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
中文教程更新不及时问题
大家好, 我是Flink的初学者,在跟着 https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/try-flink/table_api/ 官方教程(中文版)学习时发现很多中文教程都没有翻译掉,但是去看PR记录又发现很多都已经提了PR但是一直没有合并到主分支,里面很多PR都是几个月前的提的,后来好久都没有更新。 请问现在还有人在跟这些问题吗?如果有,可以更新下JIRA上的工单状态和代码PR状态,这样有需要我们也能申领工单给社区做一些贡献。 谢谢! Kevin Fan
Re:KafkaSource检查点的end to end duration较长(1min)原因
我觉得问题是不是出在反压上面,你的job是不是有反压? 在 2021-06-21 10:38:36,"yidan zhao" 写道: >如题,我任务的检查点(对齐检查点)大多数时间成功,偶现失败。目前针对超时类失败做了分析,存在部分特点,希望大佬们分析下原因。 > >(1)KafkaSouce的e2e时间达到1min+,正常xxx >ms就结束了。同时对应e2e达到1min+的情况下,sync、async、alignment、start delay等都为0,偶尔几个x >ms的。 这个不是很明白什么情况会是这样呢? > >(2)对于部分task,start delay假设为31s,alignment duration为43s,但是processed >data才xxxKB(几十到几百KB)。从我任务的正常处理情况对比来说,这个数据量几乎不需要时间就能处理完。 > >(3)有个window算子(前边是hash进来,keyBy的),检查点时间1m14s。然后看了下subtask的检查点,大多数都是2s内完成,其中1个subtask耗时1m14s。这个subtask对应的start-delay为1m13s。 > 这个就更奇怪了,首先前边是keyBy,所以是hash分区方式进入window算子。那么,对于正常subtask0,其start_delay为1s,那么subtask0收到第一个barrier耗时1s,假设这个barrier来自上游算子的 >0 >号子任务(preTask0)。那么preTask0既然已经发送了barrier,对于window任务的异常subtask就应该也能很快收到barrier,可是实际却耗时1min14s(start >delay)。
?????? flink-1.13.1 sql error
?? -- -- ??: "user-zh" http://apache-flink.147419.n8.nabble.com/
flink-elasticsearch6-sink question
i have two questions. 1. when i add elasticserach host and port,i random write host, but not report error. for example List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http")); 2. when i write conrrect elasticsearch host and port, but no response, also not create index in elasticsearch 你好,我在使用flink-elasticsearch6-sink的功能,但是不能执行成功,存在以下两个问题。 1、当我随便写elasticsearch的hostname时,并不会有任何报错信息。例如 List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http")); 填写的sdfsdfsf是不存在的。 2、当我使用docker启动一个elasticsearch时,程序并不会写入到elasticsearch中 ,但是也不报错。以下是代码附件。 package com.dufeng.test.connectors; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ElasticsearchSinkExample1 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); DataStream source = env.generateSequence(0, 10); List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http")); httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http")); httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http")); ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction() { public IndexRequest createIndexRequest(String element) { Map json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(Long aLong, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(String.valueOf(aLong))); } } ); esSinkBuilder.setBulkFlushMaxActions(1); source.addSink(esSinkBuilder.build()); env.execute("Elasticsearch 6.x end to end sink test example"); } }
flink-elasticsearch6-sink question
你好,我在使用flink-elasticsearch6-sink的功能,但是不能执行成功,存在以下两个问题。 1、当我随便写elasticsearch的hostname时,并不会有任何报错信息。例如 List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http")); 填写的sdfsdfsf是不存在的。 2、当我使用docker启动一个elasticsearch时,程序并不会写入到elasticsearch中 ,但是也不报错。以下是代码附件。 package com.dufeng.test.connectors; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ElasticsearchSinkExample1 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); DataStream source = env.generateSequence(0, 10); List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http")); httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http")); httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http")); ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction() { public IndexRequest createIndexRequest(String element) { Map json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(Long aLong, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(String.valueOf(aLong))); } } ); esSinkBuilder.setBulkFlushMaxActions(1); source.addSink(esSinkBuilder.build()); env.execute("Elasticsearch 6.x end to end sink test example"); } }
Re: KafkaSource检查点的end to end duration较长(1min)原因
存在间断性周期性反压。 因为有window算子,时间窗口触发。 熊云昆 于2021年6月21日周一 下午11:03写道: > > 我觉得问题是不是出在反压上面,你的job是不是有反压? > > > > > > > > > > > > > > > > > > 在 2021-06-21 10:38:36,"yidan zhao" 写道: > >如题,我任务的检查点(对齐检查点)大多数时间成功,偶现失败。目前针对超时类失败做了分析,存在部分特点,希望大佬们分析下原因。 > > > >(1)KafkaSouce的e2e时间达到1min+,正常xxx > >ms就结束了。同时对应e2e达到1min+的情况下,sync、async、alignment、start delay等都为0,偶尔几个x > >ms的。 这个不是很明白什么情况会是这样呢? > > > >(2)对于部分task,start delay假设为31s,alignment duration为43s,但是processed > >data才xxxKB(几十到几百KB)。从我任务的正常处理情况对比来说,这个数据量几乎不需要时间就能处理完。 > > > >(3)有个window算子(前边是hash进来,keyBy的),检查点时间1m14s。然后看了下subtask的检查点,大多数都是2s内完成,其中1个subtask耗时1m14s。这个subtask对应的start-delay为1m13s。 > > 这个就更奇怪了,首先前边是keyBy,所以是hash分区方式进入window算子。那么,对于正常subtask0,其start_delay为1s,那么subtask0收到第一个barrier耗时1s,假设这个barrier来自上游算子的 > >0 > >号子任务(preTask0)。那么preTask0既然已经发送了barrier,对于window任务的异常subtask就应该也能很快收到barrier,可是实际却耗时1min14s(start > >delay)。
Re: standalone K8S 如何查看 TaskMananger 的 gc.log ?
你可以增加如下参数来打开gc日志,并且写入到/opt/flink/log目录下 env.java.opts: -verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4 -Xss512k env.java.opts.jobmanager: -Xloggc:/opt/flink/log/jobmanager-gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M env.java.opts.taskmanager: -Xloggc:/opt/flink/log/taskmanager-gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M Best, Yang WeiXubin <18925434...@163.com> 于2021年6月18日周五 下午2:27写道: > 请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置 > TaskMananger 的 *gc.log* 日志? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: question about flink on k8s per-job mode
如果不严格区分名字的话,application模式和per-job模式在K8s环境下是没有差异的,都是在JM完成的任务提交 目前的K8s application mode在高可用开启的情况下,也只能支持提交一个任务 [1]. https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster [2]. https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#application-mode Best, Yang Best, Yang at003 于2021年6月17日周四 下午4:51写道: > 哈喽,各位专家/大神: > > 为啥flink官方文档说明了flink on k8s 还有 native k8s都不支持 per-job mode 呢,但是搜索可以搜到好多教程。。。 > > 谢谢 > > > >
rocksdb对比filestatebackend
如题,我生产中目前一直都是使用的FileStateBackend,然后使用一个对象存储服务作为后端。 按照我的理解,这种方式下,状态的操作性能很高,都是在内存内部,只有检查点时候才会输出到对象存储中。但是,不支持增量检查点。 RocksDB支持增量检查点,但是缺点是每个状态的操作都是需要序列化/反序列化,至于是文件还是内存操作可能还和rocksdb的块大小,多久刷新等有关。 不过我现在在想,既然我的任务的状态当前使用内存存储,也就是内存存储是能够容纳我的全状态的。 那么是否我从FileStateBackend切换到RocksDB其实性能也不会降低很多呢? 就是牺牲很少性能,换来增量检查点。 此外,还有个点。RocksDB的增量检查点在每次检查点时候,输出到对象存储的部分也是增量?还是全量。 只是rocksdb自身使用增量状态,然后检查点存储的时候是全量到对象存储吗? 还是说,对象存储中存储的ckpt-1,ckpt-2,ckpt-3等也是增量的,即ckpt-3可能依赖ckpt-2等这样。
退订
退订