官网给的示例命令如下
./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.cluster-id=flink-k8s-application-cluster \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=flink:latest \
请问运行在k8s per job上的flink作业要如何正确监控?一方面通过sidecar的方式收集日志,另一方面要怎么收集那些flink作业metrics?
这方面有什么资料参考吗?
.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#rbac
>
>Best,
>Yang
>
>陈帅 于2020年12月28日周一 下午5:54写道:
>
>> 今天改用官方最新发布的flink镜像版本1.11.3也启不起来
>> 这是我的命令
>> ./bin/kubernetes-session.sh \
>> -Dkubernetes.cluster-id=rtdp \
>
我在尝试运行flink on 非原生 k8s session cluster时遇到示例作业运行失败的情况,还请解答一下,谢谢!
参考了官网给的k8s yaml配置文件 [1],k8s session
cluster能够运行起来,flink:latest下载的是v1.11.2版本的flink
按照其配置起了2个TM,每个TM开了一个slot,一共是两个slots。
之后我通过web UI提交了一个自带的TopSpeedWindowing示例,结果作业在scheduling一段时间后直接fail了,通过kubectl
logs查看JM日志如下,TM没有输出日志
t;当然你也可以手动拼接出来这个链接,minikube ip拿到APIServer地址,然后用kubectl get svc 去查看你创建的Flink
>Session Cluster对应的rest svc的NodePort,拼起来访问就好了
>
>
>Best,
>Yang
>
>陈帅 于2020年12月27日周日 下午10:51写道:
>
>>
>> 本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有miniku
请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?
请问flink如何批/流读写阿里云oss? 我试着通过filesystem sql connector [1] 连接oss,按照官网配置oss
endpoint, accessKey (id + secret) [2] 在flink-conf.yaml文件,程序仍然报找不到
fs.oss.endpoint,我已经将 flink-oss-fs-hadoop jar放进了
plugins/目录下面。想问一下读写oss是否一定要依赖于hadoop呢?官网有提及 Hadoop Aliyun
module,不知道具体做法是什么?还请给个具体实操例子,谢谢!
[1]
kubeletBack-off
restarting failed container
这里面有两个ConfigMap没有找到,是需要提前创建吗?官方文档没有说明?还是我看漏了?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-session
在 2020-12-27 22:50:32,"陈帅" 写道:
>本人第一次
本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有minikube单机环境,以下是实验步骤:
git clone
https://github.com/apache/flink-dockercdflink-docker/1.12/scala_2.12-java8-debian
docker build --tag flink:1.12.0-scala_2.12-java8 .
cd flink-1.12.0
./bin/kubernetes-session.sh \
业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。
请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka
connector支持这种需求么?
flink 1.11+ 支持yarn
application模式提交任务,我试着用这个模式提交examples下的TopSpeedWindowing任务,我将$FLINK_HOME/lib目录下的文件和要运行任务的jar文件都上传到了hdfs,运行如下命令:
./bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost:9000/flink/libs" \
请问 flink 1.11 版本下 用于提交用户任务的方法
org.apache.flink.client.ClientUtils.submitJob(ClusterClient, JobGraph)方法在 flink
1.12版本下找不到了,
现在用哪个方法取代了呢?又该如何获取提交任务后的jobId呢?谢谢!
| publicstaticJobExecutionResultsubmitJob( |
| | ClusterClientclient, |
| | JobGraphjobGraph) throwsProgramInvocationException { |
| |
>Confluent Schema Registry参考这个
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html
>
>
>
>
>guoliubi...@foxmail.com
>
>发件人: 陈帅
>发送时间: 2020-12-14 23:33
>收件人: user-zh@flink.apache.org
>主题: 如何让FlinkSQL访问到阿里云MaxC
如何让FlinkSQL访问到阿里云MaxCompute上的表?
又或者是Confluent Schema Registry上那些带schema的kafka topic?
需要自己定义Catalog吗?有相关的教程和资料么?谢谢!
数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit
中的kafka消息,里面user_behavior消息例如
{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",
"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}
可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下
CREATE TABLE user_log (
user_id
etastore+success-file,可以检查一下分区目录下success
> file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
> LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
> LOG.info("Committed partition {} to metastore", partitionSpe
tion(String op) {
String operation = "INSERT";
for (RowKind rk : RowKind.values()) {
if (rk.shortString().equals(op)) {
switch (rk) {
case UPDATE_BEFORE:
case UPDATE_AFTER:
operation
最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive
streaming不能自动注册hive分区吗?还是我使用的姿势不对?
陈帅 于2020年11月1日周日 下午5:24写道:
> 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> ") STORED AS TEXTFILE TBLPROPERTIES ("
>
> 这是生成的hive表建表语句
>
> hive> show create table
66')
Time taken: 0.252 seconds, Fetched: 25 row(s)
另外,下载了hive文件内容如下
1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT
还是查询不到结果
hive> select * from team;
OK
Time taken: 0.326 seconds
陈帅 于2020年11月1日周日 下午5:10写道:
>
> 之前没加watermark和设置分区是能够
之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
陈帅 于2020年11月1日周日 下午4:43写道:
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream
BEFORE:
case UPDATE_AFTER:
operation = "UPDATE";
break;
case DELETE:
operation = "DELETE";
break;
case INSERT:
我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
team]], fields=[team_id,
flink版本是1.11.1,我将
flink-connector-debezium-1.1.0.jar,
flink-connector-mysql-cdc-1.1.0.jar,
flink-sql-connector-kafka_2.12-1.11.1.jar,
flink-sql-connector-mysql-cdc-1.1.0.jar,
flink-format-changelog-json-1.1.0.jar
下载并加入到 $FLINK_HOME/lib目录,并以embedded模式启动flink sql
client,同时在mysql中插入一张表,然后在flink sql
现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。
LakeShen 于2019年12月18日周三 下午2:12写道:
> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅 于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>
谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢?
朱广彬 于2019年12月18日周三 上午10:30写道:
> Hi 陈帅,
>
> 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro
> schema的管理,所以,我们改动了flink-avro 的源码来支持。
>
> 主要涉及到这些地方:
>
> org.apache.flink.formats.av
flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table?
Flink Table API Schema定义里面的 Types.SQL_TTIMESTAMP 类型用json表示的话一定要用
-MM-dd'T'HH:mm:ss.SSS'Z'表示吗?
示例程序如下:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
;
}
}
}
Yuan,Youjun 于2019年12月12日周四 上午9:28写道:
> 首先通过一个自定义表函数(table
> function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1),
> (ts+31, 0),
> 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM
>
> 袁尤军
>
> -邮件原件-
> 发件人: 陈帅
> 发送时间: Wednesday,
flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
s-1, 0), (ts+1, 1),
> (ts+31, 0),
> 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM
>
> 袁尤军
>
> -邮件原件-
> 发件人: 陈帅
> 发送时间: Wednesday, December 11, 2019 9:31 PM
> 收件人: user-zh@flink.apache.org
> 主题: flink持续查询过去30分钟登录网站的人数
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31,
flink 1.9里面支持cancel job with savepoint功能
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
checkpoint可能是增量的,但savepoint是全量的。具体区别可以参考
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink
lucas.wu
例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41
(5), 12:46 (4), 13:16 (0)
即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。
用flink stream api和flink
我们也遇到过类似的问题,有可能是进来的数据量带来的状态增长速度大于状态过期清理速度。另外想问一下有没有metrics监控到每次清理过期状态的大小和时间?
Yun Tang 于2019年12月10日周二 下午8:30写道:
> Hi 王磊
>
> Savepoint目录中的数据的时间戳不会在恢复的时候再更新为当前时间,仍然为之前的时间,从代码上看如果你配置了cleanupFullSnapshot就会生效的,另外配置
> cleanupInRocksdbCompactFilter
>
ract stream。在 Flink 1.11,社区将计划支持
> RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。
>
> Best,
> Jark
>
> On Sun, 8 Dec 2019 at 10:08, 陈帅 wrote:
>
> > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
> >
> >
> sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
> >
>
ngFileSink没有现成的。
>
> 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-14249
>
> Best,
> Jingsong Lee
>
>
> ------
> From:陈帅
> Send Time:
在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
有人说直接写到HBase,再在Hive关联Hbase表
但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
写的话,目前来看没有现成的Streaming
Writer,官方提供的都是
BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
业务上的Update和Delete操作 数据一般是如何sync进Hive的?
2.
请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的?
Yuan,Youjun 于2019年12月7日周六 下午8:32写道:
> 是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。
>
>
> -邮件原件-----
> 发件人: 陈帅
> 发送时间: Saturday, December 7, 2019 11:36 AM
> 收件人: user-zh@flink.apache.org
> 主题: Re: 如果用flink sql持续查询过去30分钟登
": {
> "input":[
> "2019-12-05 12:02:00,user1",
> "2019-12-05 12:13:00,user1",
> "2019-12-05 12:15:00,user1",
> "2019-12-05 12:31:00,user1"
窗口浪费了。我也考虑过用
over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
所以想问一下:
1. 针对这种case有没有标准做法?sql支持吗?
2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
谢谢!
陈帅
现在都是把代码打成一个胖包,每次这样,传输太麻烦了
flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?
请教一下:flink on yarn,提交方式是per job的话,如何保证高可用?
43 matches
Mail list logo