Re: flink 1.12.0 k8s session部署异常

2021-03-24 文章 Yang Wang
这个问题的根本原因是云上LoadBalancer一直在给Flink创建的service发送RST包导致了
这个JIRA[1]可以了解更多信息

临时绕过去的方案就是在log4j2配置里面把org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint这个类的log级别调到ERROR

[1]. https://issues.apache.org/jira/browse/FLINK-18129

Best,
Yang

18756225...@163.com <18756225...@163.com> 于2021年3月24日周三 下午5:45写道:

> 我也遇到这个问题,集群可以提交正常提交任务,但是jobmanager的日志一直有这个, 请问可有办法解决?
>
>
> 发件人: casel.chen
> 发送时间: 2021-02-07 16:33
> 收件人: user-zh@flink.apache.org
> 主题: flink 1.12.0 k8s session部署异常
> 在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
>
>
> 2021-02-07 08:21:41,873 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
> 2021-02-07 08:21:43,506 WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled
> exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-02-07 08:21:43,940 WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled
> exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> 

Re: 退订

2021-03-24 文章 Kezhu Wang
你需要发邮件到 

Best,
Kezhu Wang

On March 25, 2021 at 10:15:56, drewfranklin (drewfrank...@163.com) wrote:

退订


退订

2021-03-24 文章 drewfranklin
退订



Re: flink sql count distonct 优化

2021-03-24 文章 Robin Zhang
Hi,guomuhua
  开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。

Best,
Robin


guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>  set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> 
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
> 
> 还是flink会帮我自动改写SQL,我不用关心?
> 
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>  
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


yarn模式下应用的core/hdfs/yarn site不生效

2021-03-24 文章 崔博
hi all
如果使用yarn-cluster模式提交认为,并没有将应用下的hdfs-site coresite 
yarnsite上传依赖,而是使用yarn集群的默认配置。如果yarn集群的默认配置和应用的配置存在差异化,目前只能通过-yt解决。
为什么不上传这些依赖,只上传了flink-conf.yaml?

Re: FileSystemTableSink支持自定义分隔符写入

2021-03-24 文章 easonliu30624700
通过设置properties(csv.field-delimiter)可以指定分隔符。不过只能指定单字符。多字符分割不支持。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: flink 1.12.0 k8s session部署异常

2021-03-24 文章 18756225...@163.com
我也遇到这个问题,集群可以提交正常提交任务,但是jobmanager的日志一直有这个, 请问可有办法解决?

 
发件人: casel.chen
发送时间: 2021-02-07 16:33
收件人: user-zh@flink.apache.org
主题: flink 1.12.0 k8s session部署异常
在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
 
 
2021-02-07 08:21:41,873 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
2021-02-07 08:21:43,506 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) 
~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-02-07 08:21:43,940 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) 
~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]


Re: Flink 消费kafka ,写ORC文件

2021-03-24 文章 Jacob
谢谢回复

简单实现了一下BucketAssigner,可以实现需求


@Override
public String getBucketId(Map element, Context context) 
{
if(context.timestamp() - context.currentProcessingTime() < 0) {
return "dt="+context.timestamp();
}
return null;
}



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-24 文章 hongton122
退订中文邮件列表

FileSystemTableSink支持自定义分隔符写入

2021-03-24 文章 刘医生
Hi,有个疑问:
FileSystemTableSink 目前看是有csv和json的普通文本写入。
后续能支持配置 “字段分隔符” 写入文件系统吗?


Re: interval join 如何用 process time

2021-03-24 文章 Smile
你好,DataStream API 中的 Interval Join 目前还不支持 process time,参考 [1].
不过如果不要去严格准确的 process time 的话,是否可以在 Join 之前把 process time 用某个字段带出来,当 event
time 用?

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join



--
Sent from: http://apache-flink.147419.n8.nabble.com/