Re: flink-1.14.0 chk提交kafka offset问题

2021-12-29 文章 Guo Thompson
应该是checkpoint的时候就会提交kafka 的offset

kcz <573693...@qq.com.invalid> 于2021年12月30日周四 14:54写道:

> 有一个问题请教下大佬们,学迷糊了,
> 用了flink-1.14.0版本,开启了chk(500ms做一次),精准一次消费,事件时间
> source(kafka)- (1min的window,到时之后开始做count计算) - sink(mysql)
> 生产几条数据给kafka,但是没有超过窗口的1分钟触发时间,观察到了kafka偏移量被提交了。
> 最后的sink算子还没执行,kafka偏移量就被提交了,虽然chk已经做好了。
>
> 不是等sink执行之后,偏移量才被提交吗?


flink-1.14.0 chk????kafka offset????

2021-12-29 文章 kcz
??
flink-1.14.0chk(500ms??)
source??kafka??- (1min??windowcount) - sink(mysql)
??kafka1??kafka
??sink??kafka??chk

??sink

flinkCDC2.1.1

2021-12-29 文章 Liu Join
使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
图床链接:报错图片[cid:image003.png@01D7FD86.BF5AE890]

从 Windows 版邮件发送



Re:flink cdc2.1.1

2021-12-29 文章 Xianxun Ye
你好,
图片无法显示,建议使用外部图床上传,或是贴文字在邮件里面。


Best regards,


Xianxun


On 12/30/2021 11:53,Liu Join wrote:

使用flinkCDC2.1.1读取MySQL数据,一段时间后报错

 

从 Windows 版邮件发送

 

flink cdc2.1.1

2021-12-29 文章 Liu Join
使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
[cid:image003.png@01D7FD73.DAE77FF0]

从 Windows 版邮件发送



Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 Xuyang
可以使用case when试一下
在 2021-12-29 16:40:39,"RS"  写道:
>Hi,
>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>
>
>比如:源数据有3个字段,a,b,c
>insert into table2
>select
>a,b,c
>from table1
>当b=null的时候,只希望写入a和c
>当c=null的时候,只希望写入a和b
>


Re: flink 无法checkpoint问题

2021-12-29 文章 Caizhi Weng
Hi!

图片无法显示,建议使用外部图床上传。

checkpoint 慢的原因可能有很多,最可能的原因是由于算子处理数据太慢导致反压(可以通过 Flink web UI 每个节点的 busy
百分比大致看出来)。建议检查资源是否充足,数据是否倾斜,gc 是否过于频繁等。

紫月幽魔灵  于2021年12月28日周二 10:38写道:

> 版本:flink版本1.14.0
> 问题: 使用flink 1.14.0版本提交到jdk1.7版本的yarn集群上checkpoint无法生成,一直处于IN_PROGRESS状态
> 提交命令如下:
> ./bin/flinksql-submit.sh \
> --sql sqlserver-cdc-to-kafka.sql \
> -m yarn-cluster \
> -ynm sqlserverTOkafka \
> -ys 2 \
> -yjm 1024 \
> -ytm 1024 \
> -yid application_1640657115196_0001 \
> -yD yarn.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.master.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD log4j2.formatMsgNoLookups=true
> 这是什么原因造成的呢?
>


Re: 关于flink 1.13 TableEnvironment 和StreamTableEnvironment

2021-12-29 文章 Caizhi Weng
Hi!

Flink1.13中TableEnvironment是否支持UDF和UDTF呢?
>

支持。可以通过 tEnv.executeSql("create temporary function myUdf as
'com.my.package.name.MyUdfClass'") 来注册。

StreamTableEnvironment
> 目前只支持streaming模式?支持UDF和UDTF?能否在StreamTableEnvironment中以流的方式写批数据,批数据跑完进程就不存在了吧?
>

StreamTableEnvironment 目前只支持 streaming,同样支持 udf 和 udtf。可以在里面写批数据(称为 bounded
stream)。作业结束后进程是否存在和执行模式有关。如果是 session 模式,那么作业跑完了,job manager 和 task
manager 的进程都还在;如果是 per job 模式,那么进程都会结束。

现在不是特别清楚真正的流批一体化体现在那个入口TableEnvironment 还是StreamTableEnvironment?
>

从 SQL 的角度来看,流批一体化体现在同一套 SQL 语句既可以跑流作业,也可以跑批作业,用户无需为了流批作业写两套 SQL,和具体哪个
environment 没关系。当然用流作业跑 bounded stream 也是流批一体的体现之一,不过如果已知是 bounded
stream,跑批作业可以获得更高的效率。

Flink SQL的分区表是建立在TableEnvironment 还是StreamTableEnvironment?


Flink 目前不自己存储数据,而是用于计算外部存储系统的数据。因此分区表是建立在外部存储系统里的(需要对应外部系统支持,例如 hive),与
Flink 中的哪个 environment 无关。


Fei Han  于2021年12月28日周二 19:13写道:

>
> @all
> 大家好:
> 关于Flink1.13中,TableEnvironment 和StreamTableEnvironment有一些疑惑:
> 1.TableEnvironment支持streaming和batch
> 模式,Flink1.13中TableEnvironment是否支持UDF和UDTF呢?
> 2.StreamTableEnvironment
> 目前只支持streaming模式?支持UDF和UDTF?能否在StreamTableEnvironment中以流的方式写批数据,批数据跑完进程就不存在了吧?
> 3.现在不是特别清楚真正的流批一体化体现在那个入口TableEnvironment 还是StreamTableEnvironment?
> 4.Flink SQL的分区表是建立在TableEnvironment 还是StreamTableEnvironment?


Re: Flink集成Hive问题

2021-12-29 文章 Caizhi Weng
Hi!

看起来你本机的 8081 端口已经有别的程序占用了。能否在浏览器访问 localhost:8081,确认一下打开的是 Flink web UI 吗?

如果跑的是 Flink standalone cluster,需要先启动 standalone cluster。进入 flink 目录,运行
bin/start-cluster.sh 即可启动。

wangbi...@longi.com  于2021年12月29日周三 09:48写道:

> Hi,Flink您好
>我在集成Hive时,通过sql-client连接Hive成功,但在查询hive表时,总是提示以下错误,请问我该怎么办?
>
>  我的环境是CDH6.3.1,Flink1.13.2和1.14.2均尝试,按照官方指导一步步配置,均报相同错误。您的指导对我至关重要,期待您的回复,谢谢
> 2021-12-28 20:33:42,999 WARN
> org.apache.flink.client.program.rest.RestClusterClient   [] - Attempt
> to submit job 'collect' (3ef21e6c1235316899b83d491a9c363a) to '
> http://localhost:8081' has failed.
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.util.RestClientException: Response was not
> valid JSON, but plain-text: Login error. Need username and password
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> [?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> [?:1.8.0_221]
> at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:680)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:613)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:383)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:311)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> 

Re: flink 1.15 编译 dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.

2021-12-29 文章 Caizhi Weng
Hi!

这是说单元测试有失败的 case,可以往上翻一翻找到具体是哪个 case 报给社区。

不过如果单纯只是为了编译,可以在 mvn 命令后添加 -DskipTests
选项来跳过测试。所有模块的测试总时长很长的(小时级别),如果只是编译的话只要十几分钟就编译好了。

Michael Ran  于2021年12月29日周三 14:16写道:

> dear all :
> 有朋友遇到过编译flink 1.15 master  出现这个异常吗?
>
>
>
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test)
> on project flink-runtime: There are test failures.
> [ERROR]
> [ERROR] Please refer to
> /Users/qqr/work/git/fork/flink/flink-runtime/target/surefire-reports for
> the individual test results.
> [ERROR] Please refer to dump files (if any exist) [date].dump,
> [date]-jvmRun[N].dump and [date].dumpstream.
> [ERROR] -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :flink-runtime
>
>


Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题

2021-12-29 文章 JianWen Huang
明白了。感谢。
在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
请问在工程实践上有什么比较好的持续集成提交方式。我目前想到两种。
1.在k8s 启动一个带flink客户端的容器。在容器内部进行命令行提交。
2.在k8s以带Flink客户端的镜像启动一个Job类型作业,然后在作业运行时进行命令提交。
第1种对于kubernetes.pod-template-file的提交需要把kubernetes.pod-template-file中的模板文件cp到容器中。
第2种需要提前把kubernetes.pod-template-file文件打到带Flink客户端的镜像中。
请问您有更好的方法吗。

Yang Wang  于2021年12月26日周日 16:39写道:
>
> 拿如下提交命令举例,pod-temlate.yaml是在和运行run-application这个命令相同的机器上面。Flink
> client会自动把这个文件存放到ConfigMap,然后挂载给JM的
> user jar(StateMachineExample.jar)是需要在镜像里面
>
> 注意:一般需要在镜像里面的都会使用local://这个schema,本地文件则不需要
>
> bin/flink run-application -t kubernetes-application \
> -Dkubernetes.cluster-id=my-flink-cluster \
> -Dkubernetes.pod-template-file=/path/of/pod-template.yaml \
> local:///opt/flink/examples/streaming/StateMachineExample.jar
>
>
>
> 如果还是不明白,看一下这个测试的实现就清楚了[1]
>
> [1].
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh
>
>
> Best,
> Yang
>
> 黄剑文  于2021年12月24日周五 17:57写道:
>
> > client-local的文件,不是镜像里面的。这句话该怎么理解?因为run-application
> >
> > 模式下是需要将用户jar包跟flink标准镜像打到一起形成自己镜像然后进行提交。那么这个文件该放在哪个地方?目前我指定路径发现读的是镜像包中的路径。如/opt/my-pod-template。读的是镜像中/opt/my-pod-template文件。
> >
> > 谢谢您的回复。
> >
> > Yang Wang  于2021年12月24日周五 11:18写道:
> > >
> > > 使用flink
> > > run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件
> > > 不是镜像里面的
> > >
> > > Best,
> > > Yang
> > >
> > > hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道:
> > >
> > > > Flink版本:1.13Flink基于Native K8s
> > > >
> > 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。
> > > >
> > > >
> > 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。
> >


Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 Michael Ran
可以写两个insert 语句,后面用判断分开~。~
在 2021-12-29 16:40:39,"RS"  写道:
>Hi,
>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>
>
>比如:源数据有3个字段,a,b,c
>insert into table2
>select
>a,b,c
>from table1
>当b=null的时候,只希望写入a和c
>当c=null的时候,只希望写入a和b
>


咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 RS
Hi,
使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?


比如:源数据有3个字段,a,b,c
insert into table2
select
a,b,c
from table1
当b=null的时候,只希望写入a和c
当c=null的时候,只希望写入a和b