Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-20 Thread Shammon FY
Hi
我个人觉得简单的说flink数据传输是pull模型可能会有歧义,一般来讲大家理解的两个模型的执行流程如下
1. push模型
上下游计算任务将初始化网络连接后,上游计算任务直接通过连接不断向下游"push"数据
2. pull模型
上下游计算任务初始化网络连接后,下游计算任务根据自己的计算进度,轮询向上游发送请求“pull”数据,执行下一轮计算

在flink里,上下游交互流程主要分为几个步骤
1. 上游计算任务所在的TM创建一个Netty Server
2. 下游计算任务启动时通过Netty Client跟上游创建连接
3. 下游计算任务向上游发送一个partition
request请求,上游根据request请求创建数据reader,通过reader不断读取数据并通过连接发送数据
4. 上下游计算任务分别有自己的内存池子,用于流控,大概流程如下
a) 下游计算任务根据数据消费内存池子情况,不定期向上游计算任务更新授信(credit)
b) 上游计算任务根据接收到的credit消息,更新本地管理的授信大小
c) 上游计算任务根据本地授信大小不断向下游计算任务发送数据

通过这种方式,在资源足够的情况下,可以保证数据传输是完全流式的,这跟传统的pull模型不同,可能更像是支持授信流控机制的push模型

On Wed, Sep 21, 2022 at 9:43 AM yh z  wrote:

> 你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
> 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
> 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有
> task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。
>
> Xuyang  于2022年9月9日周五 20:35写道:
>
> > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > 在 2022-09-09 19:04:27,"郑 致远"  写道:
> > >各位大佬好
> > >请教下,
> > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
> >
>


Re: 关于Managed Memory的疑问

2022-09-22 Thread Shammon FY
Hi @haishui

这里提到的Managed Memory用于排序、哈希表等,一般是在flink批式作业里用到,例如HashJoin。
流式计算的join算子,使用statebackend存储状态,例如rocksdb。批式计算的join算子跟流式的join算子处理不同,例如批式的HashJoinOperator算子,会创建BinaryHashTable进行分桶并建立hash表,BinaryHashTable会从Managed
Memory申请和释放内存


On Thu, Sep 22, 2022 at 2:42 PM haishui  wrote:

>
> 官方文档的Configration对于taskmanager.memory.managed.size的介绍说托管内存用于排序、哈希表、缓存中间结果和RocksDB状态后端。
> 在MemoryConfigration的Memory Tuning Guide中介绍HashMap状态后端时说
> 如果Job是无状态或使用HashMap状态后端时可以设置托管内存为0。
>
> 我的Flink作业使用的HashMap状态后端,在WebUI上确实显示托管内存一直为0,将托管内存设置为0确实可行,我的疑问是托管内存何时用于排序、哈希表、缓存中间结果,我现在只遇到在使用RocksDB状态后端和Session模式下上传Jar时如果设置为0时会报错


Re: StreamTableEnvironment.fromDataStream(dataStream)如何生成T类型的Schema?

2022-09-22 Thread Shammon FY
Hi @frank

你没有贴你定义的Event类代码,我觉得你的Event类定义有点问题
如果需要flink识别Event的内部字段,需要将Event定义成pojo类,例如将字段都定义成public,或者可以是private,但是需要增加setXXX和getXXX函数



On Thu, Sep 22, 2022 at 5:45 PM Frank  wrote:

> DataStreamSource new Event(0, "张三", 1L), new Event(0, "孙小美", 1L));
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> Table table = tenv.fromDataStream(datastream)
> table.printSchema();
> 为什么上面代码生成table的schema是下面这样而不是Event的字段(t, user, event, timestamp)?
> (
>   `f0` RAW('utils.transfor.Event', '...')
> )
> 怎么改?


Re: StreamTableEnvironment.fromDataStream(dataStream)如何生成T类型的Schema?

2022-09-23 Thread Shammon FY
Hi

你的Event类定义没有贴完整,不过有个怀疑点,你应该有一个public Event(int, String, long,
long)的构造函数,需要在Event定义里增加一个空的构造函数,类似这样
public class Event implements Serializable {
private static final long serialVersionUID = 4826873295740075360L;
public int t = 0;
public String user = "";
public long event = 0L;
public long timestamp = LocalDateTime.nowTimeMillis();

public Event() { }
}

Flink需要根据空的构造函数才能识别出这是一个pojo类

On Fri, Sep 23, 2022 at 5:37 PM Frank  wrote:

> Hi, Shammon,
>
>
> 嗯,我是该贴一下Event类,如下:
>
>
> public class Event implements Serializable {
>
>
> private static final long serialVersionUID = 4826873295740075360L;
>
>
> public int t = 0;
> public String user = "";
> public long event = 0L;
> public long timestamp = LocalDateTime.nowTimeMillis();
>
>
> }
>
>
> 你可以看到,字段都是public。
>
>
> 我刚试著改成private然后添加getter, setter,但结果一样。
>
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "Shammon FY"
>   <
> zjur...@gmail.com>;
> 发送时间: 2022年9月23日(星期五) 中午11:20
> 收件人: "user-zh" >;
>
> 主题: Re:
> StreamTableEnvironment.fromDataStream(dataStream)如何生成T类型的Schema?
>
>
>
> Hi @frank
>
> 你没有贴你定义的Event类代码,我觉得你的Event类定义有点问题
>
> 如果需要flink识别Event的内部字段,需要将Event定义成pojo类,例如将字段都定义成public,或者可以是private,但是需要增加setXXX和getXXX函数
>
>
>
>
>
>
> On Thu, Sep 22, 2022 at 5:45 PM Frank 
> DataStreamSource                  new Event(0,
> "张三", 1L), new Event(0, "孙小美", 1L));
>  StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>  Table table = tenv.fromDataStream(datastream)
>  table.printSchema();
>  为什么上面代码生成table的schema是下面这样而不是Event的字段(t, user, event, timestamp)?
>  (
>  &nbsp; `f0` RAW('utils.transfor.Event', '...')
>  )
>  怎么改?


Re: PartitionNotFoundException

2022-09-28 Thread Shammon FY
Hi

计算任务输出PartitionNotFoundException,原因是它向上游TaskManager发送partition
request请求,上游TaskManager的netty server接收到partition request后发现它请求的上游计算任务没有部署成功。
所以从这个异常错误来看netty连接是通的,你可能需要根据输出PartitionNotFoundException信息的计算任务,查一下它的上游计算任务为什么没有部署成功

On Tue, Sep 27, 2022 at 10:20 PM yidan zhao  wrote:

> 补充:flink1.15.2版本,standalone集群,基于zk的ha。
> 环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。
>
> yidan zhao  于2022年9月27日周二 22:07写道:
> >
> > 此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
> > 我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
> > 会不会剩下400的TM的连接,时间厂了就会出现某种问题?
> >
> > yidan zhao  于2022年9月27日周二 16:21写道:
> > >
> > > 打开了TM的debug日志后发现很多这种日志:
> > > Responding with error: class
> > > org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException
> > >
> > > 目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
> > > org.apache.flink.runtime.io
> .network.netty.exception.LocalTransportException:
> > > Sending the partition request to '/10.216.187.171:8709 (#0)' failed.
> > > at org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > at org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > > ChannelPromise)(Unknown Source)
> > >
> > > 不清楚和debug的那个日志是否有关呢?
> > >
> > > 然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。
>


Re: PartitionNotFoundException

2022-09-29 Thread Shammon FY
一般如果是发生failover或者重启时短时间出现这个信息是没关系的,Flink会自己恢复;如果一直出现并且无法恢复,可以结合WebUI查看一下具体是哪些task没有部署成功

On Thu, Sep 29, 2022 at 10:23 AM yidan zhao  wrote:

> 嗯,谢谢建议,等再出现问题我试试,现在重启后还好,目前感觉是长时间运行后的集群才会出现。
>
> Lijie Wang  于2022年9月29日周四 10:17写道:
> >
> > Hi,
> >
> > 可以尝试增大一下 taskmanager.network.request-backoff.max 的值。默认值是 1,也就是 10 s。
> > 上下游可能是并发部署的,所以是有可能下游请求 partition 时,上游还没部署完成,增大
> taskmanager.network.request-backoff.max 可以增加下游的等待时间和重试次数,减小出现
> PartitionNotFoundException 的概率。
> >
> > Best,
> > Lijie
> >
> > yidan zhao  于2022年9月28日周三 17:35写道:
> >>
> >> 按照flink的设计,存在上游还没部署成功,下游就开始请求 partition 的情况吗? 此外,上游没有部署成功一般会有相关日志不?
> >>
> >> 我目前重启了集群后OK了,在等段时间,看看还会不会出现。
> >>
> >> Shammon FY  于2022年9月28日周三 15:45写道:
> >> >
> >> > Hi
> >> >
> >> > 计算任务输出PartitionNotFoundException,原因是它向上游TaskManager发送partition
> request请求,上游TaskManager的netty server接收到partition request后发现它请求的上游计算任务没有部署成功。
> >> >
> 所以从这个异常错误来看netty连接是通的,你可能需要根据输出PartitionNotFoundException信息的计算任务,查一下它的上游计算任务为什么没有部署成功
> >> >
> >> > On Tue, Sep 27, 2022 at 10:20 PM yidan zhao 
> wrote:
> >> >>
> >> >> 补充:flink1.15.2版本,standalone集群,基于zk的ha。
> >> >> 环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。
> >> >>
> >> >> yidan zhao  于2022年9月27日周二 22:07写道:
> >> >> >
> >> >> > 此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
> >> >> > 我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
> >> >> > 会不会剩下400的TM的连接,时间厂了就会出现某种问题?
> >> >> >
> >> >> > yidan zhao  于2022年9月27日周二 16:21写道:
> >> >> > >
> >> >> > > 打开了TM的debug日志后发现很多这种日志:
> >> >> > > Responding with error: class
> >> >> > > org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException
> >> >> > >
> >> >> > > 目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
> >> >> > > org.apache.flink.runtime.io
> .network.netty.exception.LocalTransportException:
> >> >> > > Sending the partition request to '/10.216.187.171:8709 (#0)'
> failed.
> >> >> > > at org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> >> >> > > at org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> >> >> > > at java.lang.Thread.run(Thread.java:748)
> >> >> > > Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> >> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> >> >> > > ChannelPromise)(Unknown Source)
> >> >> > >
> >> >> > > 不清楚和debug的那个日志是否有关呢?
> >> >> > >
> >> >> > > 然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。
>


Re: table store 和connector-kafka包冲突吗?

2022-10-07 Thread Shammon FY
Hi RS
你这边能提供一下具体的冲突错误栈吗?

On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:

> Hi,
>
>
> 版本:flink-1.15.1
> 使用table
> store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
>
> 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
>
>
> Thanks


Re: Tumble Window 会带来反压问题吗?

2022-10-19 Thread Shammon FY
如果必须要10分钟,但是key比较分散,感觉这种情况可以增加资源加大一下并发试试,减少每个task发出的数据量

On Thu, Oct 20, 2022 at 9:49 AM yidan zhao  wrote:

> 这个描述前后矛盾,写出速度跟不上导致反压,那控制写出速度不是问题更大。不过你不需要考虑这些,因为你控制不了写出速度,只能控制写出时机。
>
> 写出时机是由window的结束时间和watermark决定的,所以如果真要解决,需要控制分窗不要固定整点10分钟。
>
> macia kk  于2022年10月20日周四 00:57写道:
> >
> > 聚合10分钟再输出,到10分钟的时候由于积攒了很多数据,写出速度跟不上,导致反压,然后上游消费就处理变慢了。
> >
> > 如果控制一下写出的速度,让他慢慢写会不会好一些
>


Re: Re: 退订

2023-01-08 Thread Shammon FY
Hi

退订邮件组需要发送邮件到user-zh-unsubscr...@flink.apache.org 可以参考
https://flink.apache.org/community.html 这里的不同地址


On Mon, Jan 9, 2023 at 10:37 AM 中微子  wrote:

> 退订
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-01-09 10:29:18,"Yuxin Tan"  写道:
> >Hi,退订请发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org
> >
> >Best,
> >Yuxin
> >
> >
> >张保淇  于2023年1月9日周一 00:52写道:
> >
> >> 强烈要求退订
>


Re: 未知类异常: org/apache/flink/table/connector/sink/abilities/SupportsSchemaEvolutionWriting

2023-01-15 Thread Shammon FY
Hi

如果本地能运行,你可以检查下这个类在哪个包里,确认下k8s集群是不是有相关的包和类,是不是有其他包没有打进去

Best,
Shammon

On Thu, Jan 12, 2023 at 5:09 PM highfei2011  wrote:

> 本地测试正常。 On k8s 运行时,发生了异常。
>
>
> 在 2023年1月12日 15:39,highfei2011 写道:
>
>
> Hi, 大家好!我在使用 apache flink 的 table api 写阿里云 hologres
> 时,抛出了如下异常,与各位一起讨论下,谢谢🙏! Apache Flink 版本: 1.15.3 Hologres connector
> 版本:1.15-vvr-6.0.2-3,链接:
> https://search.maven.org/artifact/com.alibaba.ververica/ververica-connector-hologres/1.15-vvr-6.0.2-3/jar
> 异常信息: Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/table/connector/sink/abilities/SupportsSchemaEvolutionWriting
> at java.base/java.lang.ClassLoader.defineClass1(Native Method) at
> java.base/java.lang.ClassLoader.defineClass(Unknown Source) at
> java.base/java.security.SecureClassLoader.defineClass(Unknown Source) at
> java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown
> Source) at
> java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown
> Source) at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown
> Source) at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
> Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at
> com.alibaba.ververica.connectors.hologres.factory.HologresTableFactory.createDynamicTableSink(HologresTableFactory.java:49)
> at
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)


Re: 运行中的作业状态清除操作

2023-02-14 Thread Shammon FY
Hi

这个是无法办到的,建议还是重启

On Mon, Feb 13, 2023 at 4:33 PM Jason_H  wrote:

> 遇到的问题:在flink中,使用状态的地方已经设置了ttl, 但是上下游把数据清空了,然后想重新测试发现,
> flink计算的结果不是从0开始累计的,是基于之前的结果继续累计的,这种情况在不重启作业的情况下 有什么办法处理吗?
>
>
> 具体来说就是,在不停止flink作业的情况下,怎么清楚运行中的flink作业的状态数据,以达到计算结果归0开始累计。
>
>
> 在不重启作业的情况下,清空状态数据是不是和重启作业运行是一样的效果,避免状态累计数据对结果产生影响呢。
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable

2023-02-16 Thread Shammon FY
Hi

上面TM心跳出现unreachable,一般是TM退出了,可以看下退出原因
下面Checkpoint超时,可以看下是否出现反压等问题,也可以看checkpoint执行时间,考虑增加checkpoint超时时间

Best,
Shammon


On Thu, Feb 16, 2023 at 10:34 AM lxk  wrote:

> 你好,可以dump下内存分析
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-16 10:05:19,"Fei Han"  写道:
> >@all
> >大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错:
> >org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with
> id container_e506_1673750933366_49579_01_02(
> hdp-server-010.yigongpin.com:8041) is no longer reachable. at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> ~[?:1.8.0_181] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5]
> >在以上报错后,还会出现如下checkpoint报错:org.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint expired before completing. at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
> [flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]。
> >请教下大佬们!这2个地方还怎么优化呢?有什么好的方法没有。
>


Re: Flink SQL 实现数组元素变换的UDF

2023-02-16 Thread Shammon FY
Hi

可以考虑将这个function打入到udf包里,在自定义的udf里直接调用?



On Wed, Feb 15, 2023 at 4:29 PM 723849736 <723849...@qq.com.invalid> wrote:

> 大家好,
>
> 我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
> https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
> 目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗?
>
>
> 因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗?
>
>
> class ArrayTransformFunction extends ScalarFunction {
>
>   def eval(a: Array[Long], function: Long => Long): Array[Long] = {
> a.map(e => function(e))
>   }}
> 异常信息如下
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. An error occurred in the type inference logic of
> function 'transform'.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
> at SQLTest$.main(SQLTest.scala:44)
> at SQLTest.main(SQLTest.scala)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'transform'.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.util.Optional.flatMap(Optional.java:241)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
> ... 6 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'udf.ArrayTransformFunction'. Please check for implementation mistakes
> and/or provide a corresponding hint.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
> at
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
> ... 17 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to output mapping.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
> ... 20 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
> at
> org.apache.flink.table.types.extra

Re: Disable the chain of the Sink operator

2023-02-16 Thread Shammon FY
Hi

Do you mean how to disable `chain` in your custom sink connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:

> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>


Re: Disable the chain of the Sink operator

2023-02-16 Thread Shammon FY
Hi wudi

I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
chained together, and the Commiter task runs slowly, it can block the
upstream Writer tasks by back pressure too.

On the other hand, you can try to increase the parallelism of sink node to
speedup the Commiter operation

Best,
Shammon

On Thu, Feb 16, 2023 at 11:38 PM wudi <676366...@qq.com.invalid> wrote:

> thanks for your replies.
> I think that if Writer and Commiter are not chained together, data
> consistency can be guaranteed, right?
> Because when the Commiter does not block the Writer, at the next
> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
> not be triggered
>
> In addition, in this scenario (writer blocking caused by slow commit), may
> the performance of disabling Sink's chain be better? Because it can reduce
> a lot of back pressure.
>
> Thanks && Regards,
> di.wu
>
>
> > 2023年2月16日 下午10:05,Chesnay Schepler  写道:
> >
> > As far as I know that chain between committer and writer is also
> required for correctness.
> >
> > On 16/02/2023 14:53, weijie guo wrote:
> >> Hi wu,
> >>
> >> I don't think it is a good choice to directly change the strategy of
> chain. Operator chain usually has better performance and resource
> utilization. If we directly change the chain policy between them, users can
> no longer chain them together, which is not a good starting point.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >>
> >> wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:
> >>
> >>Thank you for your reply.
> >>
> >>Currently in the custom Sink Connector, the Flink task will
> >>combine Writer and Committer into one thread, and the thread name
> >>is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
> >>In this way, when the *Committer.commit()* method is very slow, it
> >>will block the*SinkWriter.write()* method to receive upstream data.
> >>
> >>The client can use the *env.disableOperatorChaining() *method to
> >>split the thread into two threads:*[Sink: Writer (1/1)#0] *and
> >>*[Sink: Committer (1/1)#0]*. This Committer. The commit method
> >>will not block the SinkWriter.write method.
> >>
> >>If the chain policy can be disabled in the custom Sink Connector,
> >>the client can be prevented from setting and disabling the chain.
> >>Or is there a better way to make*Committer.commit()* not block
> >>*SinkWriter.write()*?
> >>
> >>Looking forward for your reply.
> >>Thanks && Regards,
> >>di.wu
> >>
> >>>2023年2月16日 下午6:54,Shammon FY  写道:
> >>>
> >>>Hi
> >>>
> >>>Do you mean how to disable `chain` in your custom sink
> >>>connector?  Can you
> >>>give an example of what you want?
> >>>
> >>>On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
> >>>
> >>>>Hello
> >>>>
> >>>>The current Sink operator will be split into two operations,
> >>>>Writer and
> >>>>Commiter. By default, they will be chained together and executed
> >>>>on the
> >>>>same thread.
> >>>>So sometimes when the commiter is very slow, it will block the data
> >>>>writer, causing back pressure.
> >>>>
> >>>>At present, FlinkSQL can be solved by disabling the chain
> >>>>globally, and
> >>>>DataStream can partially disable the chain through the
> >>>>disableChaining
> >>>>method, but both of them need to be set by the user.
> >>>>
> >>>>Can the strategy of the Chain be changed in the Custom Sink
> >>>>Connector to
> >>>>separate Writer and Commiter?
> >>>>
> >>>>Thanks && Regards,
> >>>>di.wu
> >>>>
> >>>
> >>
> >
>
>


Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 Thread Shammon FY
Hi

如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作

Best,
Shammon


On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:

> Hi,
> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
> Thanks
>
>
>
> 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >
> >
> >请问:
> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >我理解flink
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >
>


Re: Flink1.16写入kafka 报错:Cluster authorization failed.

2023-02-19 Thread Shammon FY
Hi

从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题

Best,
Shammon

On Fri, Feb 17, 2023 at 6:29 PM lxk  wrote:

> Flink版本:1.16
> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错:
> 2023-02-17 15:03:19
> org.apache.kafka.common.KafkaException: Cannot execute transactional
> method because we are in an error state
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
> Cluster authorization failed.
>
>
> 在了解了相关源码之后,知道KafkaSink这种新的kafka
> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。
> 但是在使用老的kafka
> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。
>
>
> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下


Re: FlinkSql如何实现水位线对齐

2023-02-22 Thread Shammon FY
Hi

目前SQL还不支持watermark对齐,目前有FLIP正在讨论中
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240884405

Best,
Shammon


On Wed, Feb 22, 2023 at 3:15 PM haishui  wrote:

> Hi, all
> 以并行度4读取kafka的topic1和topic2形成两个流,然后IntervalJoin。在kafka堆积大量数据的情况下,我分别用SQL和DataStream
> API实现了上述功能。
>
>
> 使用SQL实现的作业中IntervalJoin算子的状态会逐渐增大,直到checkpoint失败。原因是在8个Source分区中输出水位线差距很大。
> 使用API实现的作业,在使用Flink15版本的水位线对齐后可以保证正常读取topic内的所有数据。
>
>
>
> 想请教一下大家如何在SQL上解决Source处水位线差距过大,数据堆积导致checkpoint失败问题。还有如果只有一个topic有数据如何保证作业不会崩溃


Re: 退订

2023-02-22 Thread Shammon FY
退订请发送邮件到 user-zh-unsubscr...@flink.apache.org

Best,
Shammon

On Thu, Feb 23, 2023 at 11:03 AM 宋品如  wrote:

> 退订


Re: flink avro schema 升级变动,job如何平滑过渡

2023-02-24 Thread Shammon FY
Hi

你可以贴一下错误看下具体原因

Best,
Shammon

On Fri, Feb 24, 2023 at 6:10 PM Peihui He  wrote:

> Hi, all
>
> 请教大家有没有遇到这样的情况,flink 使用avro
> 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。
>
> 大家一般是怎么处理的呢
>
> Best Wishes.
>


Re: managed memory占用100%的问题

2023-02-28 Thread Shammon FY
Hi

根据邮件里的异常信息看了下代码,这里的RecordArea会从managed memory申请内存分片 你可以根据作业流量尝试调整下窗口大小或者内存分配

Best,
Shammon


On Tue, Feb 28, 2023 at 6:47 PM Junrui Lee  wrote:

> Hi,
>
> 图片挂掉了,能不能直接用文字描述配置文件?
>
> Best,
> Junrui
>
> 生于八十年代 <623730...@qq.com.invalid> 于2023年2月28日周二 18:31写道:
>
> > 社区的各位大佬们有个问题咨询一下大家:
> > 1.
> >
> 问题背景:我们在使用flink读取消费kafka中的hdfs路径消息,然后从hdfs中读取文件,做完处理后写入hive,整个过程都是以流式的过程完成,而不是批处理的过程;
> > 目前遇到的问题是任务运行一段时间之后,kafka就开始出现hdfs路径消息积压,目前发现managed
> >
> memory区域消耗的内存非常大,占用了100%。但是我们目前使用的是hashmap+hdfs的状态后端,写入hive的sql的是10分钟的滚动窗口+group
> > by + sum这样的操作,同时我们开启了minibatch这样的优化选项。
> >
> >
> > 2. 由于我们没有使用rocks db和批处理,按照官网的说法,这一块内存是不占用的,但是我尝试给这个区域配置为0,会报下面空指针的异常。
> > java.lang.NullPointerException: Initial Segment may not be null
> >  at
> >
> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:67)
> >  at
> > org.apache.flink.runtime.io
> .disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:46)
> >  at
> >
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.(AbstractBytesMultiMap.java:226)
> >  at
> >
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap.(AbstractBytesMultiMap.java:114)
> >  at
> >
> org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap.(WindowBytesMultiMap.java:40)
> >  at
> >
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.(RecordsWindowBuffer.java:72)
> >  at
> >
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer$Factory.create(RecordsWindowBuffer.java:164)
> >  at
> >
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.open(AbstractWindowAggProcessor.java:118)
> >  at
> >
> org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.open(SlicingWindowOperator.java:152)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> >  at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> >  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> >  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> >  at java.lang.Thread.run(Thread.java:748)
> >
> > 3. 下面是我们的内存配置:
> >
> >
> > 4.
> >
> 所以managed区域到底存储了什么东西,占用了这么大的内存?我们的kafka消息积压是否与这里的managed区域占满有关系,希望各位大佬能答疑解惑
> >
>


Re: Flink内存问题

2023-03-02 Thread Shammon FY
Hi

如果有搜集metrics,可以根据metrics查看一下是哪部分内存上涨导致container被kill掉;然后将上涨比较快的container内存dump一下,查看具体是哪些对象占用内存比较多

Best,
Shammon


On Thu, Mar 2, 2023 at 7:14 PM 吴先生 <15951914...@163.com> wrote:

> Hi,
> Flink版本:1.12
> 部署模式:on yarn per-job
> 开发方式:DataStream Api
> 状态后端:RocksDB
> Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn
> kill的现象,目前有不少任务都会存在类似问题:
> Closing TaskExecutor connection container_e02_1654567136606_1034_01_12
> because: [2023-03-02 08:12:44.794]Container
> [pid=11455,containerID=container_e02_1654567136606_1034_01_12] is
> running 745472B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB
> of 8 GB physical memory used; 10.0 GB of 40 GB virtual memory used. Killing
> container.
> 请问:
> 该如何排查及优化
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |


Re: flinkSQL无法实现两个timestamp(3) 相减

2023-03-06 Thread Shammon FY
Hi

如果没有现成的系统函数,你可以写个自定义udf来实现
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/

Best,
Shammon


On Mon, Mar 6, 2023 at 7:46 PM 唐世伟  wrote:

>
> 我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?


Re: Flink作业tm Connection timed out异常问题

2023-03-06 Thread Shammon FY
Hi

很多原因都可能会导致连接失败问题,包括机器故障、系统问题或者服务器负载,如果是怀疑负载问题你可以找几台服务器和这台有疑问的服务器组成个小集群,提交一些作业,让这台服务器负载不要太高,观察一下作业运行情况

Best,
Shammon

On Mon, Mar 6, 2023 at 8:49 PM crazy <2463829...@qq.com.invalid> wrote:

> 报错日志下面这个一样,是同一个问题么
> https://issues.apache.org/jira/browse/FLINK-19925
>
>
> 其中描述到服务器 "high cpu usage or high network pressure" 可能会导致这个原因,想问下cpu usage,
> network咋样才算高?
>
>
>
>
> crazy
> 2463829...@qq.com
>
>
>
>  
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> tanyuxinw...@gmail.com>;
> 发送时间: 2023年3月6日(星期一) 下午2:59
> 收件人: "user-zh"
> 主题: Re: Flink作业tm Connection timed out异常问题
>
>
>
> 不建议这样做,因为这样会掩盖问题。
>
> 但如果一定要配置"重试次数"或"超时时长" 这些参数,会涉及到很多参数,比如 akka.tcp.timeout,
> taskmanager.network.netty.client.connectTimeoutSec,
> taskmanager.network.retries等等,具体可以参考[1]。
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/
>
> Best,
> Yuxin
>
>
> crazy <2463829...@qq.com.invalid> 于2023年3月6日周一 14:41写道:
>
> > 机器问题从监控上暂时没发现啥问题,能否通过增加"重试次数"或"超时时长"来缓解这个问题呢?不太清楚具体参数需要设置哪些?
> >
> >
> >
> >
> > crazy
> > 2463829...@qq.com
> >
> >
> >
> >  
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> >  
> "user-zh"
> >
> <
> > tanyuxinw...@gmail.com>;
> > 发送时间: 2023年3月6日(星期一) 下午2:33
> > 收件人: "user-zh" >
> > 主题: Re: Flink作业tm Connection timed out异常问题
> >
> >
> >
> > "如果进程没被调度到这台机器上,任务正常",从给出的描述来看,确实很可能是 A 这台机器有问题。
> >
> > 可以检查机器 A 的网络、内存、CPU
> > 指标或者监控是否正常,与其他机器是否存在不同。比如网络参数的配置、机器内存是否存在损坏、机器是否存在异常进程或负载等等。
> >
> > 如果硬件问题,系统日志有可能有一些报错。也可以使用一些机器检查工具, dmesg/vmstat等。
> >
> > Best,
> > Yuxin
> >
> >
> > crazy <2463829...@qq.com.invalid> 于2023年3月6日周一 14:23写道:
> >
> > > 各位大佬好,有个线上作业频繁failover,异常日志如下:
> > >
> > > 2023-03-05 11:41:07,847 INFO 
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraph      
> > [] - Process (287/300) (b3ef27fec49fe3777f830802ef3501e9) switched
> from
> > RUNNING to FAILED on container_e26_1646120234560_82135_01_97 @
> > xx.xx.xx.xx (dataPort=26882).
> > > org.apache.flink.runtime.io
> .network.netty.exception.LocalTransportException:
> > readAddress(..) failed: Connection timed out (connection to 'xxx/
> > 10.70.89.25:43923')
> > >    at org.apache.flink.runtime.io
> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:201)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:818)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> > ~[flink-dist_2.11-1.13.5.jar:1.13.5]
> > >    at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> > ~[flink-dist_2.11-1.13.5.jar:1.

Re: Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-07 Thread Shammon FY
Hi

像上面提到的,jobClient.get().getAccumulators()会从flink集群获取作业相关信息,如果是application模式,作业结束后flink集群也会退出。你可以通过其他方式,包括session集群运行或者启动history
server等方式,也可以通过自定义metrics等输出到其他系统

Best,
Shammon


On Tue, Mar 7, 2023 at 11:27 PM 李银苗  wrote:

> 退订


Re: flink问题咨询

2023-03-09 Thread Shammon FY
Hi

我个人觉得可以将你现在的process计算分为两部分,你提到每隔20s触发的delta计算部分放到stream2部分,类似于这种形式
 
stream1.keyBy().connect(stream2.keyBy().process(处理增量,每20秒触发输出)).process(根据增量更新ListState)

这样不需要从ListState中去查找哪些数据被更新了

Best,
Shammon


On Thu, Mar 9, 2023 at 10:48 AM 陈隽尧  wrote:

> 您好,
>
>
>  我是flink一名新用户,最近在项目中需要用到flink完成一项业务功能,但目前遇到了一些一些困难,想咨询你一下是否有合适的解决方案,期待您的回信
>
>
>
>
>  
> 问题背景:我们需要基于股票交易流水和股票行情去计算股票账户层面的一些指标(为简化场景,假定账户指标只有持仓量,买入均价,市值),页面前端20s刷新一次,指标计算想基于flink的dataStream
> Api实现,但遇到一个问题,目前初步想法如下,请flink大神帮忙指导
>
>
>
> 初步方案设想:假定stream1: 股票交易流水, stream2:股票行情流水
> stream1.keyBy().connect(stream2.keyBy()).process(),
> key为股票代码,在processFunction里面
>
>
>
> Ø  open方法:加载日初预算的指标值到一个ListState中,listState里面对象包含四个字段: 账户,持仓量,买入均价,市值
> (均为日初的值)
>
>
>
> Ø  processElement1:基于每笔股票交易流去计算仅受交易流水影响的指标(如持仓量和买入均价),更新ListState,
>
>
>
> Ø  processElement2:
> 只把行情作为状态缓存(MapState,key为股票代码),保留每个标的最新的行情(由于系统只20s更新一次数据,行情的推送频率相对较高大约3s一次,没必要每来一次就算一次)
>
>
>
>
> Ø  设置一个定义器20s执行一次,在onTime()
> 去基于上面的ListState和当前最新行情的MapState计算市值然后更新去ListState中市值数据(市值的计算逻辑是最新持仓*行情最新价格,onTime里面只会发送20s内有变化的ListState到下一个算子继续处理(下个算子会做汇聚计算)
>
>
>
>
>
>
>
> 存在问题:就是我怎么知道每次onTime触发的20s内,哪些ListState发生了变化?因为processElement方法和onTimer方法我理解时在两个线程里面分别处理的,如果在processElement方法中通过给listState里面每个数据加修改状态,在onTimer获取标记的ListState然后要清除状态,但要保证正确必须做线程同步,感觉flink里面
> 做线程同步是不是不太合适
>
>
>
> ²
> 不用stream1.coGroup(stream2).where().equalTo().window(TumblingEventTimeWindows.of(Time.hours(1))).apply()的原因,是因为coGroupFunction只会在窗口关闭时触发计算,但是交易流可以来一条数据处理一次,不想做成这种微批处理
>
> ²
> 使用定时器不在processElement2的方法中计算的原因是因为只需要20s更新一次,行情更新频率较快,没必要浪费算力行情数据每次来一次算一次
>
>
> 保密备注:
>  
> 本邮件及其附件含有华泰证券股份有限公司及/或其子公司的保密信息,仅限于发送给上面地址中列出的个人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!声明:
>  
> 本邮件提供的信息或观点不构成购买或出售所提及的投资产品的投资建议、要约或招揽。与您的特定投资目标、财务状况和特定需求无关。因此,不承担由此信息直接或间接导致损失的责任。
>  计算机病毒可以通过电子邮件传播。
> 接收方应在接收电子邮件或任何附件时检查有无病毒。本公司对由于本电子邮件引发病毒所产生的任何损失不承担任何责任。电子邮件传输过程中不能确保安全和准确,信息可能被拦截、篡改、丢失、损坏,也可能延迟送达、不完整或包含病毒,因此本公司对电子邮件传输过程中所产生的任何内容错误或缺失不承担任何责任。Confidentiality
> Note:  This e-mail and its attachments contain confidential
> information from Huatai Securities Co., Ltd. and/or its subsidiaries, which
> is intended only for the person or entity whose address is listed above.
> Any use of the information contained herein in any way (including, but not
> limited to, total or partial disclosure, reproduction, or dissemination) by
> persons other than the intended recipient(s) is prohibited. If you receive
> this e-mail in error, please notify the sender by phone or email
> immediately and delete it.Disclaimer: The information or opinions
> provided in this email do not constitute an investment advice, an offer or
> solicitation to subscribe for, purchase or sell the investment product(s)
> mentioned herein. It does not have any regard to your specific investment
> objectives, financial situation and any of your particular needs.
> Accordingly, no warranty whatsoever is given and no liability whatsoever is
> accepted for any loss arising whether directly or indirectly as a result of
> this information.Computer viruses can be transmitted via email. The
> recipient should check this email and any attachments for the presence of
> viruses. The company accepts no liability for any damage caused by any
> virus transmitted by this email. E-mail transmission cannot be guaranteed
> to be secure or error-free as information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or contain viruses. The sender
> therefore does not accept liability for any errors or omissions in the
> contents of this message, which arise as a result of e-mail transmission.
>
>


Re: flink on yarn 异常停电问题咨询

2023-03-09 Thread Shammon FY
Hi

我觉得Flink
作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业

Best,
Shammon

On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:

> 前提
> 1.flink配置了高可用
> 2.flink配置checkpoint数为10
> 3.yarn集群配置了任务恢复
> 疑问
> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>
>
>
>


Re: Flink-Sql Watermarkers问题

2023-03-13 Thread Shammon FY
Hi

目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote:

> hi,
> 我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |


Re: GenericRowData与BinaryRowData的转换

2023-03-13 Thread Shammon FY
Hi

你可以考虑将field数据从BinaryRowData中读取出来再转换成string试试

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 4:21 PM zilong xiao  wrote:

> hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData
> toString,BinaryRowData没有实现该方法QQAQ
>
> Benchao Li  于2021年4月9日周五 10:42写道:
>
> > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
> > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。
> >
> > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
> > 比如典型的就是序列化的时候都会按照BinaryRowData来序列化。
> >
> > Luna Wong  于2021年4月8日周四 下午7:36写道:
> >
> > > 我看Kafka Connector源码生成的是GenericRowData,到Jdbc
> > > Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


Re: flink avro schema 升级变动,job如何平滑过渡

2023-03-13 Thread Shammon FY
Hi

从错误上看应该是schema跟数据不匹配导致导致的,看起来目前avro不支持这种schema变更新老数据一起处理

Best,
Shammon.FY


On Fri, Mar 10, 2023 at 2:29 PM Peihui He  wrote:

> java.io.IOException: Failed to deserialize Avro record.
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
>
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: java.io.EOFException
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:851)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder.doReadBytes(BinaryDecoder.java:373)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder.readString(BinaryDecoder.java:290)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
>
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> ... 9 more
>
>
> 如上,
> 比如
> 之前的schemal 是
> {
> a,
> b
> }
>
> 后来调整为
> {
> a,
> b,
> c
> }
>
> 当程序升级后,由于kafka中同时包含新旧数据,就会报错了
>
> Shammon FY  于2023年2月24日周五 18:56写道:
>
> > Hi
> >
> > 你可以贴一下错误看下具体原因
> >
> > Best,
> > Shammon
> >
> > On Fri, Feb 24, 2023 at 6:10 PM Peihui He  wrote:
> >
> > > Hi, all
> > >
> > > 请教大家有没有遇到这样的情况,flink 使用avro
> > > 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。
> > >
> > > 大家一般是怎么处理的呢
> > >
> > > Best Wishes.
> > >
> >
>


Re: 咨询yarn session 集群启动后在不重启的情况下如何更新一个jar包

2023-03-14 Thread Shammon FY
Hi

如果自定义的connector是和作业打包在一起提交的,那可以可以只重启指定作业就可以了;如果这些connector是在flink
session集群启动时加载的,一般最好重启集群,避免不同版本connector冲突

Best,
Shammon FY

On Tue, Mar 14, 2023 at 5:59 PM wdmcode  wrote:

>
> hi all
> 我在yarn集群使用yarn session方式启动了一个flink集群。集群中有一些自定义的Connector。自定义的Connector
> Jar包放在本地的lib目录。
> 我如何在不重启yarn session集群的情况下更新一个Connector呢。如果重启yarn
> session集群会导致所有任务都要重启。但是更新一个Connector只会影响部分任务。
>


Re: Re: 无法设置任务名

2023-03-16 Thread Shammon FY
Hi

控制台具体是指哪块?你可以检查下其他日志是否显示正常

Best,
Shammon FY


On Wed, Mar 15, 2023 at 11:29 PM wei_yuze  wrote:

> 我又试了一次,web UI有了,可能是我看漏了。我设置的任务名是XXX_Statistics。
>
>
> 但是控制台里的输出没有任务名。是log4j2级别不对吗?我在log4j2.properties里设置的输出等级为INFO
>
>
> rootLogger.level = INFO
>
>
>
>
>
>
>
> 原始邮件
>
>
>
> 发件人:"Weihua Hu"< huweihua@gmail.com >;
>
> 发件时间:2023/3/15 21:03
>
> 收件人:"user-zh"< user-zh@flink.apache.org >;
>
> 主题:Re: 无法设置任务名
>
>
> Hi,
>
> UI 显示的任务名是什么呢?
>
> Best,
> Weihua
>
>
> On Wed, Mar 15, 2023 at 8:02 PM wei_yuze  wrote:
>
> > 您好!
> >
> >
> >
> >
> > 我在使用flink1.16.0。在通过这个方式设置了任务名:
> > streamExecutionEnvironment.execute("jobName")
> > 但是web UI 中并不显示出设置的用户名。请问哪位大佬能答疑一下,感谢!


Re: 实时数据同步对比监控有什么好的工具和方案吗?

2023-03-17 Thread Shammon FY
Hi

具体是要监控哪些信息?不同的信息会有不同的工具和方案,比如资源使用率、failover情况、同步数据延时等

Best,
Shammon FY


On Fri, Mar 17, 2023 at 10:52 AM casel.chen  wrote:

> 业务上利用flink作业做实时数据同步,请问实时数据同步对比监控有什么好的工具和方案吗?
> 实时同步链路:mysql -> kafka canal -> flink -> doris
>
>
> 欢迎大家提供思路


Re: flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-21 Thread Shammon FY
Hi

你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了?

Best,
Shammon FY


On Tue, Mar 21, 2023 at 11:55 AM casel.chen  wrote:

> 有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun
> oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb
> state
> backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗?


Re: prometheus监控flink作业经常OOM

2023-03-21 Thread Shammon FY
Hi

可以找一些go相关的内存分析工具,看下prometheus进程主要内存使用情况

Best,
Shammon FY

On Tue, Mar 21, 2023 at 10:16 AM casel.chen  wrote:

>
> 线上用prometheus监控几百个flink作业,使用的是pushgateway方式,设置采样作业metrics周期是30秒,prometheus服务本身给了将近50GB内存,还是会经常发生OOM,请问有什么调优办法吗?


Re: Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?

2023-03-21 Thread Shammon FY
Hi

你可以在你的提交平台启动后台任务定去向k8s查询作业状态,Flink也在设计支持作业状态汇报[1],目前正在讨论中

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Status+Listener

Best,
Shammon FY


On Wed, Mar 22, 2023 at 8:54 AM casel.chen  wrote:

> Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?作业一旦crash失败就会被k8s回收到相关的pod,没法通过web
> url去获取作业状态,有什么别的办法吗?通过metrics? 如果是的话具体是哪一个metric值呢?


Re: Re: flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-21 Thread Shammon FY
那可能需要确认一下这些状态
1. 是否确实属于这个作业的状态
2. 这些状态是成功的checkpoint还是失败的checkpoint
3. 是否清理checkpoint出现了问题,排查下有没有相关错误日志

Best,
Shammon FY

On Wed, Mar 22, 2023 at 8:51 AM casel.chen  wrote:

> 检查过了,当前`state.checkpoints.num-retained`参数值是3
>
>
> 在 2023-03-21 20:05:35,"Shammon FY"  写道:
> >Hi
> >
> >你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了?
> >
> >Best,
> >Shammon FY
> >
> >
> >On Tue, Mar 21, 2023 at 11:55 AM casel.chen  wrote:
> >
> >> 有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun
> >>
> oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb
> >> state
> >>
> backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗?
>


Re: 退订

2023-03-22 Thread Shammon FY
退订请发送邮件到 user-zh-unsubscr...@flink.apache.org


On Wed, Mar 22, 2023 at 8:13 PM jianbo zhang  wrote:

> 退订
>


Re: flink写入mysql数据异常

2023-03-23 Thread Shammon FY
Hi

你可以将问题描述和sql放在一个外部文档,例如google文档,然后将文档连接发在邮件里

Best,
Shammon FY

On Fri, Mar 24, 2023 at 10:58 AM 孙冬燕 
wrote:

> 退订
> --
> 发件人:小昌同学 
> 发送时间:2023年3月24日(星期五) 10:57
> 收件人:user-zh 
> 抄 送:user-zh 
> 主 题:回复: flink写入mysql数据异常
> 您好, 可能是我这边上传附件的方式不对,我场景描述的不够准确;
> 您看是否方便加一个微信呢【15956076613】,我将文档和截图发您,帮忙看一下;
> 谢谢大佬的指导
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Jane Chan |
> | 发送日期 | 2023年3月23日 20:40 |
> | 收件人 |  |
> | 主题 | Re: flink写入mysql数据异常 |
> 附件还是没有收到哦.
> Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [,
> column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1]
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries
> >
> On Thu, Mar 23, 2023 at 5:35 PM 小昌同学  wrote:
> 您好,我刚刚重新上传了附件;是的,Flink
> SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导
> 小昌同学
> ccc0606fight...@163.com
> <
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D>
> <
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D>
> >
>  回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 15:42
> 收件人  
> 主题 Re: flink写入mysql数据异常
> Hi,
> 没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
> 并且与数据库中物理表主键保持一致. 可以参考 [1].
> [1]
>
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
> <
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
> >
> On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:
> 大佬,你好,代码上传在附件中了;
> 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?
> 小昌同学
> ccc0606fight...@163.com
> <
>
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D
> <
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D
> >
>  回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 14:23
> 收件人  
> 主题 Re: flink写入mysql数据异常
> 可以把完整 SQL 发出来看看
> 祝好!
> Jane
> On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:
> 使用flink
> sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
> ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>


Re: flink写入mysql数据异常

2023-03-24 Thread Shammon FY
Hi

退订发送邮件到 user-zh-unsubscr...@flink.apache.org


On Fri, Mar 24, 2023 at 1:23 PM 孙冬燕  wrote:

> 退订
> --
> 发件人:小昌同学 
> 发送时间:2023年3月24日(星期五) 13:22
> 收件人:user-zh 
> 抄 送:user-zh 
> 主 题:回复: flink写入mysql数据异常
> 好滴呀,谢谢您的建议;
>
> https://www.yuque.com/g/echochangtongxue/yxxdbg/iyfqa9fh34i5lssu/collaborator/join?token=KZCQVX5pqH3rmPNP#
> <
> https://www.yuque.com/g/echochangtongxue/yxxdbg/iyfqa9fh34i5lssu/collaborator/join?token=KZCQVX5pqH3rmPNP#
> > 邀请你共同编辑文档《Flink SQL写入到mysql的问题》
> 我创建了一个语雀,我将代码以及问题都写在文档里了,麻烦大佬们帮忙看一下问题呀
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 发送日期 | 2023年3月24日 13:08 |
> | 收件人 |  |
> | 主题 | Re: flink写入mysql数据异常 |
> Hi
> 你可以将问题描述和sql放在一个外部文档,例如google文档,然后将文档连接发在邮件里
> Best,
> Shammon FY
> On Fri, Mar 24, 2023 at 10:58 AM 孙冬燕 
> wrote:
> 退订
> --
> 发件人:小昌同学 
> 发送时间:2023年3月24日(星期五) 10:57
> 收件人:user-zh 
> 抄 送:user-zh 
> 主 题:回复: flink写入mysql数据异常
> 您好, 可能是我这边上传附件的方式不对,我场景描述的不够准确;
> 您看是否方便加一个微信呢【15956076613】,我将文档和截图发您,帮忙看一下;
> 谢谢大佬的指导
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Jane Chan |
> | 发送日期 | 2023年3月23日 20:40 |
> | 收件人 |  |
> | 主题 | Re: flink写入mysql数据异常 |
> 附件还是没有收到哦.
> Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [,
> column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1]
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries
> >
> <
>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries
> >
> On Thu, Mar 23, 2023 at 5:35 PM 小昌同学  wrote:
> 您好,我刚刚重新上传了附件;是的,Flink
> SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导
> 小昌同学
> ccc0606fight...@163.com
> <
>
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D>
> <
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D>
> >
> <
>
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D>
> <
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D>
> >
>  回复的原邮件 
> 发件人 Jane Chan 
> 发送日期 2023年3月23日 15:42
> 收件人  
> 主题 Re: flink写入mysql数据异常
> Hi,
> 没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
> 并且与数据库中物理表主键保持一致. 可以参考 [1].
> [1]
>
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
> <
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
> >
> <
>
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
> <
> https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
> >
> On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:
> 大佬,你好,代码上传在附件中了;
> 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?
> 小昌同学
> ccc0606fight...@163.com
> <
>
> https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D
> <
> https://dashi.163.com/project

Re: flink watermark 乱序数据问题

2023-03-26 Thread Shammon FY
Hi

使用withTimestampAssigner只是定义了生成watermark消息的策略,不会影响数据流。超出指定时间的数据是否处理,可以在定义window的时候使用allowedLateness定义最晚的late
event,超出这个时间的窗口数据会直接丢弃

Best,
Shammon FY

On Sat, Mar 25, 2023 at 12:28 AM crazy <2463829...@qq.com.invalid> wrote:

> 大佬好,如下程序,flink在生成watermark策略中,forBoundedOutOfOrderness
> 这个乱序时长的指定会不会导致数据的丢失呢?比如有数据事件时间超过5ms,这条数据会进入到streamTS里吗?
>
>
> SingleOutputStreamOperator mySource.assignTimestampsAndWatermarks(         
>      
> WatermarkStrategy.                    
>   .withTimestampAssigner(           
>                     new
> SerializableTimestampAssigner                    
>        @Override         
>                    
>      public long extractTimestamp(ClickEvent event, long
> recordTimestamp) {               
>                    
>    return event.getDateTime();         
>                    
>      }             
>                  
>  })       );


Re: flink1.16 sql gateway hive2

2023-03-26 Thread Shammon FY
Hi

如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录

Best,
Shammon FY


On Sun, Mar 26, 2023 at 12:07 PM guanyq  wrote:

> 本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
> ./bin/sql-gateway.sh start-foreground
> -Dsql-gateway.endpoint.type=hiveserver2
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
>
>
> 异常信息
>
> Available factory identifiers are:
> rest
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> at
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> at
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> Exception in thread "main"
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> start the endpoints.
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
> at
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'hiveserver2' that implements
> 'SqlGatewayEndpointFactory' in the classpath.
> Available factory identifiers are:
> rest
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> at
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> ... 2 more
>
>


Re: flink sql的codegen导致metaspace OOM疑问

2023-03-29 Thread Shammon FY
Hi

自增id可以为同一个作业的多个codegen类生成唯一类名
一般metaspace可以通过fullgc释放,你可以查看你的集群metaspace大小,是否触发了了fullgc

Best,
Shammon FY

On Wednesday, March 29, 2023, tanjialiang  wrote:

> Hi all,
>我有一个通过flink kubernetes operator定时提交到同一个session作业(底层是将flink
> sql转JobGraph的逻辑下推到了JobManager执行),当他跑了一段时间后,JobManager报了metaspace OOM.
>经过排查后发现是flink sql codegen生成的代码类有一个自增ID,这些类在使用完后不会释放。
>
>
> 疑问:
> 1. flink sql codegen做这样的一个自增ID有什么特殊意义吗?
> 2. java中通过类加载器加载的类有什么办法可以释放?
>
>
>
>
>


Re: PartitionNotFoundException

2023-04-02 Thread Shammon FY
Hi

出现PartitionNotFoundException通常是指定task的上游有subtask失败了,你可以查看一下上游subtask有没有错误日志,根据错误日志查看具体原因

Best,
Shammon FY

On Mon, Apr 3, 2023 at 10:08 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

>
> hi, 最近从1.14升级到1.16后,kafka消费不定时会出现 
> [org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition *** not found.]
> 然后不停自动重启job再继续抛出该异常后 不断重启,直到手动cancel任务后 再启动才恢复正常消费
>
> 在1.14集群中从未出现的问题,升到1.16后才出现,请问是否有配置可以优化或避免该异常?
>


Re: 退订

2023-04-04 Thread Shammon FY
Hi

发送任意邮件到 user-zh-unsubscr...@flink.apache.org
 退订

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list


On Tue, Apr 4, 2023 at 1:20 PM 柳懿珊  wrote:

> 退订


Re: 退订

2023-04-06 Thread Shammon FY
Hi

退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org


[1] https://flink.apache.org/community/#mailing-lists


On Thu, Apr 6, 2023 at 2:00 PM Tony  wrote:

> 退订


Re: Re: PartitionNotFoundException

2023-04-09 Thread Shammon FY
像上面提到的,流式作业可以设置taskmanager.network.tcp-connection.enable-reuse-across-jobs:
false,一般作业影响不会有影响

Best,
Shammon FY

On Mon, Apr 10, 2023 at 9:27 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> hi, 上周调整这两参数后,正常运行了近一个星期后 又重现了[PartitionNotFoundException]...
>
> taskmanager.network.max-num-tcp-connections  只是调整为2,可能是太小了 今天我改为4 再看看
> 或者 将flink版本升级到 1.17 是否可修复该问题?
>
> From: yidan zhao
> Date: 2023-04-03 10:45
> To: user-zh
> Subject: Re: PartitionNotFoundException
> 设置 taskmanager.network.tcp-connection.enable-reuse-across-jobs 为
> false,设置 taskmanager.network.max-num-tcp-connections 大点。
> 之前有个bug导致这个问题我记得,不知道1.16修复没有。
>
> zhan...@eastcom-sw.com  于2023年4月3日周一 10:08写道:
> >
> >
> > hi, 最近从1.14升级到1.16后,kafka消费不定时会出现 
> > [org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition *** not found.]
> > 然后不停自动重启job再继续抛出该异常后 不断重启,直到手动cancel任务后 再启动才恢复正常消费
> >
> > 在1.14集群中从未出现的问题,升到1.16后才出现,请问是否有配置可以优化或避免该异常?
>


Re: 退订

2023-04-13 Thread Shammon FY
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org
,可以参考[1]

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Thu, Apr 13, 2023 at 9:53 PM lei-tian  wrote:

> 退订
>
>
>
> | |
> totorobabyf...@163.com
> |
> |
> 邮箱:totorobabyf...@163.com
> |


Re: sink mysql id自增表数据会丢失

2023-04-17 Thread Shammon FY
Hi

如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?

On Sun, Apr 16, 2023 at 7:58 PM Jeff  wrote:

> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
>
>
>  mysql内表ddl:
>
> create table test (id bigint primary key auto_increment , passport
> varchar);
>
>
> flink sql:
> insert into mysql_catalog.test select 0, passport from source_table;
>
> 之所以select 0是表示使用物理表的自增值。


Re: 流数据转化为json

2023-04-17 Thread Shammon FY
Hi

对于kafka的问题,使用print或者其他方式有数据输出吗?可以通过这种方式确认一下是作业本身的数据问题还是kafka的问题

Best,
Shammon FY


On Fri, Apr 14, 2023 at 7:17 PM 小昌同学  wrote:

> 你好,请问一下上游的数据是
> SingleOutputStreamOperator outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: 用Flink Table API和RocksDB不能正常升级状态数据结构

2023-04-17 Thread Shammon FY
Hi

目前增减列数据会导致状态无法兼容

Best,
Shammon FY


On Fri, Apr 14, 2023 at 9:09 PM Elvis Chen 
wrote:

> 我们正在使用flink-1.16.0的Table API和RocksDB作为后端,为我们的用户提供运行SQL
>
> queries的服务。表格是使用Avro模式创建的,当以兼容的方式更改模式,例如添加一个带默认值的field时,我们无法从savepoint恢复作业。这是在数据结构升级后的报错:
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must
> not be incompatible with the old state serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984)
> ...
>


Re: 退订

2023-04-17 Thread Shammon FY
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org
,可以参考
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Fri, Apr 14, 2023 at 7:32 PM daniel sun  wrote:

> 退订
> zjw 于2023年4月14日 周五下午7:17写道:
>
> >
>


Re: flink api消费kafka(avro)数据出错

2023-04-18 Thread Shammon FY
Hi

看着是解析数据错误,可以检查一下是不是source的schema和数据不匹配

On Tue, Apr 18, 2023 at 2:46 PM kcz <573693...@qq.com.invalid> wrote:

> 版本:1.15.2
>1.首先是采用SQL方式,将json数据输入到kafka里面(avro格式)
>2.然后采用DS api方式去接收解析kafka里面的avro数据
> --报错如下--
> Caused by: java.io.IOException: Failed to deserialize consumer record due
> to
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -49
> at org.apache.avro.io
> .BinaryDecoder.readString(BinaryDecoder.java:308)
> at org.apache.avro.io
> .ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
> ... 14 more
> Disconnected from the target VM, address: '127.0.0.1:60668', transport:
> 'socket'
>
>
> Process finished with exit code 1
>
>
>
> --第一步SQL代码如下--
>create table test (
>   a string,
>   b string,
>   c string
>   ) WITH (
>         'connector' = 'kafka',
>         'topic' = 'sink',
>         'properties.bootstrap.servers' =
> 'localhost:9092',
>         'format' = 'avro',
>  
> 'properties.allow.auto.create.topics' = 'true'
>         );
>
>
>   create table test_consumer (
>   a string,
>   b string,
>   c string
>   ) WITH (
>         'connector' = 'kafka',
>         'topic' = 'source',
>         'properties.bootstrap.servers' =
> 'localhost:9092',
>         'format' = 'json',
>  
> 'properties.allow.auto.create.topics' = 'true',
>   'properties.group.id' = 'group',
>
>   'scan.startup.mode' =
> 'latest-offset'
>         );
>   insert into test select * from test_consumer;
>
>
>
>
> -第二步API接收kafka
> avro代码如下--

Re: Re: sink mysql id自增表数据会丢失

2023-04-18 Thread Shammon FY
如果想让mysql生成自增主键,可以在flink ddl的table里不增加主键字段,然后flink作业直接写入数据到table就可以了

On Tue, Apr 18, 2023 at 5:38 PM Jeff  wrote:

> 在sink时指定字段不可以不包括自增主键的列。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-04-17 07:29:16,"Shammon FY"  写道:
> >Hi
> >
> >如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
> >
> >On Sun, Apr 16, 2023 at 7:58 PM Jeff  wrote:
> >
> >> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
> >>
> >>
> >>  mysql内表ddl:
> >>
> >> create table test (id bigint primary key auto_increment , passport
> >> varchar);
> >>
> >>
> >> flink sql:
> >> insert into mysql_catalog.test select 0, passport from source_table;
> >>
> >> 之所以select 0是表示使用物理表的自增值。
>


Re: Re: Re: sink mysql id自增表数据会丢失

2023-04-18 Thread Shammon FY
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org
,可以参考
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Wed, Apr 19, 2023 at 9:31 AM 王国成  wrote:

> 退订
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-04-19 09:15:09,"Shammon FY"  写道:
> >如果想让mysql生成自增主键,可以在flink ddl的table里不增加主键字段,然后flink作业直接写入数据到table就可以了
> >
> >On Tue, Apr 18, 2023 at 5:38 PM Jeff  wrote:
> >
> >> 在sink时指定字段不可以不包括自增主键的列。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-04-17 07:29:16,"Shammon FY"  写道:
> >> >Hi
> >> >
> >> >如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
> >> >
> >> >On Sun, Apr 16, 2023 at 7:58 PM Jeff  wrote:
> >> >
> >> >> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
> >> >>
> >> >>
> >> >>  mysql内表ddl:
> >> >>
> >> >> create table test (id bigint primary key auto_increment , passport
> >> >> varchar);
> >> >>
> >> >>
> >> >> flink sql:
> >> >> insert into mysql_catalog.test select 0, passport from source_table;
> >> >>
> >> >> 之所以select 0是表示使用物理表的自增值。
> >>
>


Re: 不同的流程使用不同的并行度

2023-04-20 Thread Shammon FY
Hi

DataStream作业设置并发度有两种方式
1. 在ExecutionEnvironment通过setParallelism设置全局并发
2. 在DataStream中通过setParallelism为指定的datastream计算设置并发度

Best,
Shammon FY

On Fri, Apr 21, 2023 at 8:58 AM 小昌同学  wrote:

>
>
> 各位老师好,请教一下关于flink的并行度的问题;
> 我现在数据上游是kafka(四个分区),经过Flink
> ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API
> 还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: flink rocksdb异常

2023-04-23 Thread Shammon FY
Hi

这是TM向JM发送消息超时了,可以了看下JM是否有错误日志,或者对应的TM和JM是否有资源打满等情况,导致akka消息超时

Best,
Shammon FY


On Sun, Apr 23, 2023 at 2:28 PM crazy <2463829...@qq.com.invalid> wrote:

> Hi, 大佬好,
>       有个Flink on
> Yarn程序,Flink版本使用的是flink-1.13.5,statebackend使用的是rocksdb,任务跑一段时间,就会出现如下堆栈异常:
>
>
>      2023-04-20 22:32:08,127 INFO 
> org.apache.flink.runtime.taskmanager.Task         
>           [] - Attempting to fail task externally
> Source: slow_source_name_3 (15/100)#0 (39e2f191bef3484c9b629258eb5afb87).
> 2023-04-20 22:32:08,146 WARN 
> org.apache.flink.runtime.taskmanager.Task         
>           [] - Source: slow_source_name_3
> (15/100)#0 (39e2f191bef3484c9b629258eb5afb87) switched from RUNNING to
> FAILED with failure cause: java.util.concurrent.TimeoutException:
> Invocation of public abstract java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway.sendOperatorEventToCoordinator(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,org.apache.flink.runtime.jobgraph.OperatorID,org.apache.flink.util.SerializedValue)
> timed out.
>
>         at
> com.sun.proxy.$Proxy28.sendOperatorEventToCoordinator(Unknown Source)
>
>         at
> org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway.sendOperatorEventToCoordinator(RpcTaskOperatorEventGateway.java:58)
>
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl$OperatorEventGatewayImpl.sendEventToCoordinator(OperatorEventDispatcherImpl.java:114)
>
>         at
> org.apache.flink.streaming.api.operators.SourceOperator.emitLatestWatermark(SourceOperator.java:393)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1425)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1416)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>
>         at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>
>         at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@xx:40381/user/rpc/jobmanager_2#-1762983006]]
> after [1 ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.RemoteFencedMessage]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
>
>         at
> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
>
>         at
> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
>
>         at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
>
>         at
> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
>
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>
>         at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>
>         at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
>
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
>
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
>
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
>
>         ... 1 more
>
>
>
>
>
> 查看对应进程日志,
>
> #
>
> # A fatal error has been detected by the Java Runtime Envi

Re: 关于Apache Flink源码贡献流程

2023-04-24 Thread Shammon FY
Hi tanjialiang

`EncodingFormat`和`DecodingFormat`是PublicEvolving接口,你可以在
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
创建一个FLIP,然后在dev邮件组发起FLIP讨论;完成讨论后发起FLIP投票,投票通过后可以在对应的issue上提交PR

Best,
Shammon FY

On Mon, Apr 24, 2023 at 5:10 PM tanjialiang  wrote:

> Hi, Jing Ge
> 感谢你的回复。
> 目前我已经发起了一个英文的邮件讨论(大约两周前),但是目前回复邮件的开发者只有一个,针对这种情况我应该如何去安排后续的代码贡献工作?
>
>
> Best regrads,
> tanjialiang.
>  回复的原邮件 
> | 发件人 | Jing Ge |
> | 发送日期 | 2023年4月24日 16:30 |
> | 收件人 |  |
> | 主题 | Re: 关于Apache Flink源码贡献流程 |
> Hi,
>
> 如果是增加public API变更的话,建议先发起一个英文的邮件讨论,然后看是否需要创建FLIP,然后在基于FLIP发起更具体的技术讨论
>
> On Mon, Apr 24, 2023 at 10:06 AM tanjialiang  wrote:
>
> Hello,everyone.
> 我想向apache
>
> flink贡献源码,由于修复这个issue需要新增一些API,按照流程需要发起邮件讨论,但这个topic只得到一名开发者关注,这样的情况下我应该如何进行后面的流程?期待有熟悉flink源码贡献的开发者可以提供帮助
>
>
> issue: https://issues.apache.org/jira/browse/FLINK-31686
> discuss邮件标题: EncodingFormat and DecondingFormat provide copy API
>
>
> Best regrads
> tanjialiang.
>


Re: sql查询数据库不走索引

2023-04-28 Thread Shammon FY
Hi

你使用的是flink-connector-jdbc
3.0.0-1.16版本吧?需要使用3.0.0-1.17,不过目前应该还没有release,你可以关注下

On Wednesday, April 26, 2023, 杨扬  wrote:

> 各位大佬好!
> 目前升级到了flink1.17+jdbc-3.0,经过测试依然没有实现谓词下推,想请教下这是为什么?
>
>
>
>
> > 在 2022年12月5日,下午3:05,rovo98  写道:
> >
> > 你好,请留意您使用的 flink 版本。在 flink 1.17, jdbc-3.0.0 版本之前,jdbc connector 没有实现
> SupportsFilterPushDown 接口(谓词下推),所以发送至数据库的查询是 select xxx from table_name
> 的全表扫描形式。
> >
> >
> > 如有需要可参考 FLINK-16024 对您使用的 jdbc connector 版本进行修改。
> >
> >
> >
> >
> > https://issues.apache.org/jira/browse/FLINK-16024
> > https://github.com/apache/flink/pull/20140
> >
> >
> > -- 原始邮件 --
> > 发件人:
> "user-zh"
>   <
> yangya...@cupdata.com>;
> > 发送时间: 2022年12月5日(星期一) 下午2:43
> > 收件人: "user-zh" >
> > 主题: sql查询数据库不走索引
> >
> >
> >
> > 各位好!
> >   目前有一使用flink-sql编写的作业,其中存在通过jdbc查询mysql中某张表A需求,A表“
> b字段”为索引字段,但是flink-sql查询无法走到该表索引查询,为全表扫描查询。
> >   代码类似于
> > CREATE TABLE A (
> > b decimal(4, 0),
> > ...,
> > ...
> > ) WITH (
> > 'connector' = 'jdbc',
> > 'url' = 'jdbc:mysql://100.191.200.10:/lldb',
> > 'username' = 'test',
> > 'password' = 'Test!123',
> > 'table-name' = ‘A’
> > )
> > select * from A where b = 1234;
> > 此时发送至数据库的查询为select * from A去掉了后面的where筛选条件,从而无法使用b字段索引查询,变为全表扫描。
> >
> > 此问题是否有办法解决呢?难道flink-sql是先在数据库中全表扫描,再在flink中执行筛选?这样数据库的查询效率极低。
> > ===
> > 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>
>


Re: checkpoint Kafka Offset commit failed

2023-05-04 Thread Shammon FY
Hi

看起来像是网络问题导致flink作业source节点连接kafka失败,可以检查一下kafka集群的网络或者flink作业source节点的网络是否有问题

Best,
Shammon FY

On Fri, May 5, 2023 at 9:41 AM Leonard Xu  wrote:

> 可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org   取消订阅来自
> user-zh@flink.apache.org  邮件列表的邮件,邮件列表的订阅管理,可以参考[1]
>
> 祝好,
> Leonard
> [1]
> https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
>
> > 2023年5月4日 下午9:00,wuzhongxiu  写道:
> >
> > 退订
> >
> >
> >
> > | |
> > go574...@163.com
> > |
> > |
> > 邮箱:go574...@163.com
> > |
> >
> >
> >
> >
> >  回复的原邮件 
> > | 发件人 | zhan...@eastcom-sw.com |
> > | 日期 | 2023年05月04日 14:54 |
> > | 收件人 | user-zh |
> > | 抄送至 | |
> > | 主题 | checkpoint Kafka Offset commit failed |
> > hi,请问在flink(1.14、1.16) checkpoint(10s)提交 kafka偏移量提示 The coordinator is
> not available
> >
> > 查看kafka集群日志都是正常的,手动也可以正确提交偏移量,重启flink
> job后也可以正常提交,运行一段时间后又会失败,请问有参数可以优化一下吗?
> >
> > flink 日志如下:
> > 2023-05-04 11:31:02,636 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] -
> Failed to commit consumer offsets for checkpoint 69153
> > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> > Caused by:
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The
> coordinator is not available.
>
>


Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 Thread Shammon FY
Hi

如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
DataStream s1 = ...;
DataStream s2 = ...;
DataStream s = s1.union(s1)...;
Pattern = Pattern.begin("first")
.subtype(E1.class)
.where(...)
.followedBy("second")
.subtype(E2.class)
.where(...)

如果使用Flink SQL,可以直接使用双流Join+窗口实现

Best,
Shammon FY




On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:

> 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
>
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。


Re: flink 1.13 partition.time-extractor.timestamp-pattern 格式

2023-05-10 Thread Shammon FY
Hi,

就像上面文档描述的,如果是多个字段组合成partition,可以在DDL中通过partition.time-
extractor.timestamp-pattern将多个字段按照自己的partition格式需求进行组装。
CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00'
);

如果只是一个timestamp字段,想要转换成其他的时间格式,可以参考文档[1]里的例子,新建一个自己的
PartitionTimeExtractor然后通过partition.time-extractor.class指定

在flink-1.15版本及以后[2],已经支持了partition.time-extractor.timestamp-formatter,对timestamp-pattern组装的partition时间戳进行格式转换

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-time-extractor
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/filesystem/#%e5%88%86%e5%8c%ba%e6%97%b6%e9%97%b4%e6%8f%90%e5%8f%96%e5%99%a8

Best,
Shammon FY

On Wed, May 10, 2023 at 5:42 PM 莫失莫忘  wrote:

>
> 我hive的分区格式是 dt='20200520',格式是 flinkSQL 实时任务写hive 只支持 '-mm-dd
> hh:mm:ss' 格式,请问怎么指定  partition.time-extractor.timestamp-pattern 的格式为 'mmdd
> hh:mm:ss' 。flink版本是1.13
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-commit
>
>
>
>
>
>
> --
>
>
>


Re: flink 状态设置

2023-05-14 Thread Shammon FY
Hi,

"如果不对于状态进行管理,后续程序会出现问题"是指状态会变得太大?如果是这样,可以在group
by的字段里增加一个天级的时间戳,这样就不会由于key被更新导致的状态过期失效问题

Best,
Shammon FY


On Fri, May 12, 2023 at 1:59 PM 小昌同学  wrote:

> 各位老师好,我这边使用的flink sql是"
> select funcId,funcIdDesc,serverIp,cast(min(maxTime-minTime) as
> varchar(200)) as minTime,pk from
> (
>  select
>   a.funcId as funcId ,
>   a.funcIdDesc as funcIdDesc,
>   a.serverIp as serverIp,
>   b.outTime as maxTime,
>   a.outTime as minTime,
>   concat(a.funcId,a.serverIp) as pk
>  from tableRequest a
>  inner join tableAnswer b
>  on a.handleSerialNo=b.handleSerialNo
> )
> group by funcId,funcIdDesc,serverIp,pk‍‍‍"
>
> 考虑如果不对于状态进行管理,后续程序会出现问题,我这边想实现的状态管理是:我上述的这个sql计算的数据仅仅只是当天(24小时)的,等到第二天就把之前的全部状态全部清除掉,基于这样的场景我可以怎么设置什么参数管理状态,我自己设置参数为“tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));”,看官网的解释,感觉这样会有问题,idlestate是只要更新了就会重新设置过期时间,但是我想实现效果是不管是有咩有更新,只要不是属于今天的就全部清理掉。
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: 回复:报错显示为bug

2023-05-15 Thread Shammon FY
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

> 你好,从报错来看是类型不兼容导致的。
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
> 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-05-15 18:29:15, "小昌同学"  wrote:
> >|
> >package job;
> >import bean.BaseInfo;
> >import bean.MidInfo;
> >import bean.OutInfo;
> >import bean.ResultInfo;
> >import com.alibaba.fastjson.JSON;
> >import com.alibaba.fastjson.JSONObject;
> >import config.FlinkConfig;
> >import function.MyProcessFunction;
> >import org.apache.flink.api.common.functions.MapFunction;
> >import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >import org.apache.flink.api.java.tuple.Tuple2;
> >import org.apache.flink.streaming.api.TimeCharacteristic;
> >import org.apache.flink.streaming.api.datastream.DataStream;
> >import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >import org.apache.flink.table.api.DataTypes;
> >import org.apache.flink.table.api.Schema;
> >import org.apache.flink.table.api.Table;
> >import org.apache.flink.table.api.TableSchema;
> >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >import org.apache.flink.table.types.DataType;
> >import org.apache.flink.util.OutputTag;
> >import sink.Sink2Mysql;
> >import utils.DateUtil;
> >import utils.DateUtils;
> >import utils.JdbcUtil;
> >
> >import java.sql.Connection;
> >import java.sql.PreparedStatement;
> >import java.sql.ResultSet;
> >import java.time.*;
> >import java.util.Date;
> >import java.util.HashMap;
> >import java.util.Properties;
> >
> >public class RytLogAnly4 {
> >public static void main(String[] args) throws Exception {
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >//使用侧输出流
> >OutputTag requestStream = new
> OutputTag("requestStream") {
> >};
> >OutputTag answerStream = new
> OutputTag("answerStream") {
> >};
> >
> >//1、连接测试环境kafka的数据
> >String servers =
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
> >String topicName =
> FlinkConfig.config.getProperty("dev_topicName");
> >String groupId = FlinkConfig.config.getProperty("dev_groupId");
> >String devMode = FlinkConfig.config.getProperty("dev_mode");
> >Properties prop = new Properties();
> >prop.setProperty("bootstrap.servers", servers);
> >prop.setProperty("group.id", groupId);
> >prop.setProperty("auto.offset.reset", devMode);
> >DataStreamSource sourceStream = env.addSource(new
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
> >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
> <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
> >
> >//2、对源数据进行处理,生成baseInfo基类的数据
> >SingleOutputStreamOperator baseInfoStream =
> sourceStream.map(new MapFunction() {
> >@Override
> >public BaseInfo map(String value) throws Exception {
> >JSONObject jsonObject = JSON.parseObject(value);
> >//获取到不同的服务器IP
> >String serverIp = jsonObject.getString("ip");
> >//获取到不同的data的

Re: 使用flink sql创建版本视图无法正常使用

2023-05-17 Thread Shammon FY
Hi,

你邮件里的图片无法显示,也没办法看到具体的错误信息

Best,
Shammon FY


On Thu, May 18, 2023 at 10:15 AM arkey w  wrote:

> flink版本:1.14.5
> 在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例(  Versioned Tables | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/versioned_tables/>
> )进行验证也同样无法使用,创建sql如下:
> 创建事实表:
> [image: image.png]
>
> 创建版本视图:
> [image: image.png]
> [image: image.png]
>
>
> Temporal Join的结果出现了报错:
> [image: image.png]
>
> 在desc视图的时候发现视图并没有主键以及事件时间字段,而join的时候也因此报了错。
> 是我操作哪里有问题吗,要如何才能正确使用版本视图?
>
>


Re: 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 Thread Shammon FY
Hi

可以将天级时间和其他需要聚合的字段组成key,使用聚合算子,默认会每条数据完成计算后实时输出结果

Best,
Shammon FY

On Fri, May 26, 2023 at 3:44 PM casel.chen  wrote:

> 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?


Re: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 Thread Shammon FY
Hi,

这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?

Best,
Shammon FY

On Fri, May 26, 2023 at 2:03 PM tanjialiang  wrote:

> Hi, all.
> 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
> 滑动步长为5分钟,窗口为24小时,group by
> user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
> 因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 *
> 24 * 60 / 5),checkpoint barrier可能会一直卡住。
> 这时候有什么办法可以破局吗?
>
>
> best,
> tanjialiang.


Re: flink web ui显示问题

2023-05-30 Thread Shammon FY
Hi,

好像没有收到附件或者文档,你可以检查确认一下

Best,
Shammon FY

On Wed, May 31, 2023 at 9:52 AM 小昌同学  wrote:

> 各位老师好,请教一个关于flink web ui的显示问题;
> 具体的显示异常截图的我以附件的形式放在文档中,我的疑惑是web
> ui上面已经显示watermark,但是看detail的时候显示不是watermark;
> 感谢各位老师指导
>
> 小昌同学
> ccc0606fight...@163.com
>
> <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1&name=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6&uid=ccc0606fighting%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg&items=%5B%22ccc0606fighting%40163.com%22%5D>
>


Re: flink 输出异常数据

2023-05-31 Thread Shammon FY
Hi

可以看一下报空指针的具体异常栈,如果是你的业务代码,可以在你的处理逻辑里加上一些判断信息并打印到日志文件;如果不是你的业务代码,可以贴一下具体的异常栈信息。

On Wed, May 31, 2023 at 12:31 PM yidan zhao  wrote:

> 这个得靠你自己打日志吧,在可能出NPE的地方 try catch 到,然后打印原始记录。
>
> 小昌同学  于2023年5月29日周一 18:30写道:
> >
> > 你好,数据源是kafka,使用的是stream api
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
> >  回复的原邮件 
> > | 发件人 | Weihua Hu |
> > | 发送日期 | 2023年5月29日 15:29 |
> > | 收件人 |  |
> > | 主题 | Re: flink 输出异常数据 |
> > Hi,
> >
> > 你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?
> >
> > 方便把异常栈贴一下吗
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:
> >
> >
> >
> 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
>


Re: 退订

2023-06-04 Thread Shammon FY
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org,可以参考
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Sun, Jun 4, 2023 at 11:13 PM 张保淇  wrote:

> 退订
>
>
> 麻烦尽快帮忙处理
>
>
> Best wishes
> 张保淇
> 电话:+8618878478770
> 邮件:hzuzhangba...@163.com


Re: pyflink1.17 中文乱码

2023-06-07 Thread Shammon FY
Hi,

你是怎么运行的?是不是中文的文件编码格式不对?

Best,
Shammon FY


On Thu, Jun 8, 2023 at 10:07 AM yidan zhao  wrote:

> 可以描述再详细点
>
> 1  于2023年6月7日周三 19:55写道:
> >
> > 老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
> >
> >
> >
> >
> >
>


Re: Flink1.14 需求超大内存

2023-06-19 Thread Shammon FY
Hi,

这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置

Best,
Shammon FY

On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞  wrote:

> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>
> DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> required resources, failing slot requests. Acquired:
> [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> TMs: 1, registered slots: 1 free slots: 0
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not acquire the minimum required resources.
>
> 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> 这是我doris sink的代码,flink doris connector版本是1.1.1
> DorisSink.Builder builder = DorisSink.builder();
> DorisOptions.Builder dorisBuilder = DorisOptions.builder();
> dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>
> .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
> .setUsername(parameterTool.get("doris.user"))
> .setPassword(parameterTool.get("doris.password"));
>
> Properties pro = new Properties();
> pro.setProperty("format", "json");
> pro.setProperty("read_json_by_line", "true");
>
> Date date = new Date();
> DorisExecutionOptions.Builder executionBuilder =
> DorisExecutionOptions.builder();
>
> executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
>
> String[] fields =
> {"uid","subject","trade_date","update_time","value"};
> DataType[] types =
> {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
>
> builder.setDorisReadOptions(DorisReadOptions.builder().build())
> .setDorisExecutionOptions(executionBuilder.build())
>
> .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
> .setDorisOptions(dorisBuilder.build());
> fundCategoryDataStream.sinkTo(builder.build())
>
> .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> "fund_category_sink"))
>
> .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>
> .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
> .name("fundCategorySinkName”);
>
>
>


Re: 关于flink批计算

2023-06-29 Thread Shammon FY
Hi

可以的,DataStream有很多内置的数据类型,也支持自定义数据类型和数据的序列化反序列化,然后在DataStream的计算内对数据执行计算,可以参考DataStream的官方文档[1]和数据类型文档[2]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/types_serialization/

Best,
Shammon FY

On Fri, Jun 30, 2023 at 10:34 AM Liu Join  wrote:

> 请教下,如果用flink进行批计算,使用DataStream API有没有什么优化的地方,是否可以直接将数据作为矩阵在算子之间传递进行计算
>


Re: PartitionNotFoundException循环重启

2023-07-03 Thread Shammon FY
Hi,

PartitionNotFoundException异常原因通常是下游task向上游task发送partition
request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。

Best,
Shammon FY

On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

>
> 异常日志内容
>
> 2023-07-03 20:30:15,164 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> Sink 3 (2/45)
> (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> 3093 not found.
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
>
>
>
> 发件人: zhan...@eastcom-sw.com
> 发送时间: 2023-07-04 09:25
> 收件人: user-zh
> 主题: PartitionNotFoundException循环重启
> hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> PartitionNotFoundException 的异常,然后不断的循环重启
>
> 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> 的异常后,不断的重启
> taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> taskmanager.network.max-num-tcp-connections: 16
>
> 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
>
>


Re: Re: PartitionNotFoundException循环重启

2023-07-05 Thread Shammon FY
Hi,

如果要增加request
partition的重试时间,可以调整配置项`taskmanager.network.request-backoff.max`,默认是10秒,具体配置可以参阅[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-taskmanageroptions

Best,
Shammon FY

On Tue, Jul 4, 2023 at 11:38 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> 从前面日志看是重启后从hdfs加载checkpoint数据处理(100M左右)这过程好像有点久,还有连kafka消费
> 下游的超时重试  可以设置次数或者时长吗?
>
> 发件人: Shammon FY
> 发送时间: 2023-07-04 10:12
> 收件人: user-zh
> 主题: Re: PartitionNotFoundException循环重启
> Hi,
>
> PartitionNotFoundException异常原因通常是下游task向上游task发送partition
>
> request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。
>
> Best,
> Shammon FY
>
> On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
> zhan...@eastcom-sw.com> wrote:
>
> >
> > 异常日志内容
> >
> > 2023-07-03 20:30:15,164 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> > Sink 3 (2/45)
> > (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> > 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> > nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> > org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > Partition
> >
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> > 3093 not found.
> > at org.apache.flink.runtime.io
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> > at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
> >
> >
> >
> > 发件人: zhan...@eastcom-sw.com
> > 发送时间: 2023-07-04 09:25
> > 收件人: user-zh
> > 主题: PartitionNotFoundException循环重启
> > hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> > PartitionNotFoundException 的异常,然后不断的循环重启
> >
> > 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> > 的异常后,不断的重启
> > taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> > taskmanager.network.max-num-tcp-connections: 16
> >
> > 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
> >
> >
>


Re: flink on native k8s里如何使用flink sql gateway

2023-07-05 Thread Shammon FY
Hi,

我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开

Best,
Shammon FY


On Tue, Jul 4, 2023 at 10:52 AM chaojianok  wrote:

> 大家好,请教个问题。
>
> 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql
> gateway,大家有什么好的方案吗?
> 目前的做法是,进入pod里启动sql gateway,然后在k8s创建flink-sql-gateway
> service,这样就可以通过这个service来访问sql
> gateway了,但是这个方法有个问题,部署过程中必需进入pod启服务,这是不利于自动化部署的,具体的操作命令如下,大家帮忙看看有没有好的解决方案来避免这个问题。
>
> 1、创建flink集群
> ./bin/kubernetes-session.sh \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.service-account=flink-service-account \
> -Dkubernetes.rest-service.exposed.type=NodePort
>
> 2、进入pod通过 ./bin/sql-gateway.sh start
> -Dsql-gateway.endpoint.rest.address=localhost 启动sql gateway服务,退出pod
>
> 3、创建flink-sql-gateway service
> kubectl expose deployment flink-cluster --type=NodePort --port=8083
> --name=flink-sql-gateway -n flink
>


Re: flink1.14.5 sql-client 运行在yarn-session模式提交任务报错

2023-07-09 Thread Shammon FY
Hi,

邮件里的图片看不到

Best,
Shammon FY

On Sun, Jul 9, 2023 at 7:30 PM 杨东树  wrote:

> 各位好,
>目前我在使用flink1.14.5版本的sql-client on
> yarn-session模式时,发现无法正常执行sql任务,日志报如下错误,希望能得到指导,谢谢:
>背景信息:
>1、当flink配置execution.target:
> yarn-per-job时,随后进入sql-client执行sql任务,可正常执行。
>2、当flink配置execution.target: yarn-session,并启动flink
> yarn-session集群,随后进入sql-client执行同样的sql任务,报上图中的错误。
>


Re: 从kafka中读取数据到hdfs,过段时间报错

2023-07-11 Thread Shammon FY
Hi

你可以贴一下完整的异常栈信息,这可以帮助定位具体问题

Best,
Shammon FY


On Wed, Jul 12, 2023 at 10:52 AM chenyu_opensource <
chenyu_opensou...@163.com> wrote:

> 目前是用flink1.12版本,从kafka中读取数据到hdfs,前期运行正常,过段时间报错:
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> BP-504689274-10.204.4.58-1507792652938:blk_3265799450_2192171234 does not
> exist or is not under Constructionnull
> flink taskmanager报错,会和hdfs连接中断。
> datanode日志报错DataXceiver error processing WRITE_BLOCK operation
>
>
>
> 背景:读取kafka数据,sink是多个的,为了处理不同的逻辑,保存到不同的hdfs目录,同时数据量上存在数据倾斜,已使用不同的并行度去处理,但还是出现这种问题。查询到的dfs.datanode.max.transfer.threads=16384。同时当前有下游业务读取hdfs目录,是否有所影响。
>
>
> 请指教,谢谢


Re: Re: PartitionNotFoundException循环重启

2023-07-13 Thread Shammon FY
Hi,

我觉得增加到3分钟可能不是一个合适的方法,这会增加作业恢复时间。建议还是追查一下为什么上游task这么长时间没有部署启动成功比较好。

Best,
Shammon FY


On Fri, Jul 14, 2023 at 2:25 PM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> hi, 上次将`taskmanager.network.request-backoff.max` 从默认的10s增加到30s后 跑了5天还是出现
> PartitionNotFoundException循环重启
> 从日志看是连续三次checkpoint超时失败后自动重启job (Checkpointed Data
> Size一直在增长,即便当前无数据处理,也有几十上百M),某个算子会一直失败重启任务
>
> 以下是整个过程的失败日志,是否将 `taskmanager.network.request-backoff.max` 再增加到3分钟可以避免
> PartitionNotFoundException ?
>
> 2023-07-12 11:07:49,490 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Checkpoint 19177 of job 3b800d54fb6a002be7feadb1a8b6894e expired before
> completing.
> 2023-07-12 11:07:49,490 WARN
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
> trigger or complete checkpoint 19177 for job
> 3b800d54fb6a002be7feadb1a8b6894e. (3 consecutive failed attempts so far)
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> expired before completing.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216)
> [flink-dist-1.17.1.jar:1.17.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_77]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_77]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
> 2023-07-12 11:07:49,490 INFO
> org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] -
> checkpoint request time in queue: 2280006
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to
> recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold. The latest checkpoint failed due to Checkpoint expired
> before completing., view the Checkpoint History tab or the Job Manager log
> to find out why continuous checkpoints failed.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:212)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2155)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2134)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$700(CheckpointCoordinator.java:101)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_77]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ~[?:1.8.0_77]
> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_77]
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - 51 tasks
> will be restarted to recover from a global failure.
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Phase
> 2 Data Warehouse Processing (3b800d54fb6a002be7feadb1a8b6894e) switched
> from state RUNNING to RESTARTING.
>
> 2023-07-12 11:07:50,007 INFO
> org.apache.flink.runtime.executiongraph.Executi

Re: Re: flink1.14.5 sql-client 运行在yarn-session模式提交任务报错

2023-07-16 Thread Shammon FY
Hi,

根据上面的异常栈信息,你可以检查一下是否配置了cluster id,在yarn里配置项是`yarn.application.id`

Best,
Shammon FY


On Sat, Jul 15, 2023 at 6:50 PM 杨东树  wrote:

> 您好,
>针对sql-client运行在yarn-session模式报错,现补充相关日志报错信息:
> 2023-07-15 18:43:21,503 INFO  org.apache.flink.table.client.cli.CliClient
> [] - Command history file path: /root/.flink-sql-history
> 2023-07-15 18:43:28,225 INFO
> org.apache.flink.table.catalog.CatalogManager[] - Set the
> current default database as [flink] in the current default catalog [myhive].
> 2023-07-15 18:43:38,410 WARN
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder   [] - Offset
> commit on checkpoint is disabled because group.id is not specified
> 2023-07-15 18:43:39,986 WARN
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder   [] - Offset
> commit on checkpoint is disabled because group.id is not specified
> 2023-07-15 18:43:40,605 WARN
> org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The
> configuration directory ('/usr/local/flink-1.14.5/conf') already contains a
> LOG4J config file.If you want to use logback, then please delete or rename
> the log configuration file.
> 2023-07-15 18:43:40,676 INFO  org.apache.hadoop.yarn.client.RMProxy
> [] - Connecting to ResourceManager at /0.0.0.0:8032
> 2023-07-15 18:43:40,788 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - No path for the flink jar passed. Using the location
> of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2023-07-15 18:43:40,791 WARN  org.apache.flink.table.client.cli.CliClient
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:224)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:571)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:560)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:420)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_231]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:791)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:754)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:222)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:222)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> ... 12 more
> Caused by: java.lang.IllegalStateException
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
> ~[flink-dist_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execut

Re: flink1.17.1版本 MiniCluster is not yet running or has already been shut down.

2023-07-23 Thread Shammon FY
Hi,

运行的是哪个例子?从错误上看是在从MiniCluster获取结果的时候,MiniCluster被关闭了

Best,
Shammon FY

On Sat, Jul 22, 2023 at 3:25 PM guanyq  wrote:

> 本地IDEA运行 MiniCluster is not yet running or has already been shut down.
> 请问是什么原因,如何处理
>
>
>
>
> 15:19:27,511 INFO
> org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] -
> Stopping resource manager service.
>
> 15:19:27,503 WARN
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] -
> Failed to get job status so we assume that the job has terminated. Some
> data might be lost.
>
> java.lang.IllegalStateException: MiniCluster is not yet running or has
> already been shut down.
>
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> ~[flink-core-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
> [flink-table-planner_25e35ab8-6377-4c6a-a928-a9fe1ff9e7f4.jar:1.17.1]
>
> at
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
> [flink-table-common-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> [flink-table-api-java-1.17.1.jar:1.17.1]


Re: 关于DataStream API计算批数据的聚合值

2023-07-25 Thread Shammon FY
Hi,

跟使用普通流式作业的DataStream用法一样,只需要在RuntimeMode里使用Batch模式,Flink在Batch模式下会只输出最后的结果,而不会输出中间结果。具体可以参考Flink里的WordCount例子
[1]

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java


On Wed, Jul 26, 2023 at 9:10 AM Liu Join  wrote:

> 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
>


Re: JdbcSink引发的IO过高

2023-07-25 Thread Shammon FY
Hi,

目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。

具体到数据库侧,我理解执行 `insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下

Best,
Shammon FY


On Tue, Jul 25, 2023 at 4:02 PM 小昌同学  wrote:

> 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
>
> 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
> 是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values
> (1,2,3,4,5,6),(1,2,3,4,9,10);
> 或者是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
> 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO
>
>
>
> |
> errorStream.addSink(JdbcSink.sink(
> "insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
> (statement, result) -> {
> statement.setString(1,result.getAction());
> statement.setString(2,result.getServerIp());
> statement.setString(3,result.getHandleSerialno());
> statement.setString(4,result.getMd5Num());
> statement.setString(5,result.getInsertTime());
> statement.setString(6, result.getDateTime());
> },
> JdbcExecutionOptions.builder()
> .withBatchSize(1000)
> .withBatchIntervalMs(200)
> .withMaxRetries(5)
> .build(),
> new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>
> .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true")
> .withDriverName("com.mysql.jdbc.Driver")
> .withUsername("111")
> .withPassword("222")
> .build()
> )).name("sink-error-mysql");
> |
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: flink-job-history 任务太多页面卡死

2023-07-27 Thread Shammon FY
Hi,

可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options

Best,
Shammon FY

On Fri, Jul 28, 2023 at 10:17 AM 阿华田  wrote:

> 目前flink-job-history
> 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: 作业full gc 很严重

2023-08-03 Thread Shammon FY
Hi,

一般需要确认一下是哪块引起的fullgc,比如metaspace还是堆内存过大导致的。如果是堆内存过大导致的,可以将内存dump下来,用一些分析工具例如mat、visualvm等具体查看一下哪些对象占比比较多,是否存在内存泄漏等原因

Best,
Shammon FY

On Fri, Aug 4, 2023 at 10:00 AM yidan zhao  wrote:

> GC日志看GC原因
>
> 2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
> >
> > 大家好,请问下作业跑一段时间就会偶发出现背压,full gc看着很严重,有什么好的工具排查下吗?或者经验文档?谢谢!
>


Re: flink1.14.5 sql-client无法查询hbase1.4.3数据

2023-08-07 Thread Shammon FY
Hi,

看着像是版本冲突了,你有在你的flink session集群目录里放hbase的包吗?可以检查一下跟flink hbase
shaded的hbase版本是否一致

Best,
Shammon FY

On Sat, Aug 5, 2023 at 9:33 PM 杨东树  wrote:

> 各位好,
>目前使用sql-client查询hbase数据时,无法查询成功,麻烦指导下,谢谢。
>复现方法:
> 1、hbase操作:
> hbase(main):005:0> create 'flink_to_hbase','cf1'
> 0 row(s) in 2.2900 seconds
> hbase(main):006:0> put 'flink_to_hbase', 'rk0001', 'cf1:username',
> 'zhangsan'
> 0 row(s) in 0.0510 seconds
>
>
> 2、flink操作:
> ./start-cluster.sh
> ./sql-client.sh
> CREATE TABLE flink_to_hbase(
> rowkey STRING,
> cf1 ROW,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )WITH(
> 'connector'='hbase-1.4',
> 'table-name'='flink_to_hbase',
> 'zookeeper.quorum'='192.168.21.128:2181',
> 'zookeeper.znode.parent'='/hbase'
> );
>
>
> 3、flink 报错日志:
> 2023-08-05 21:00:35,081 INFO  org.apache.flink.table.client.cli.CliClient
> [] - Command history file path: /root/.flink-sql-history
> 2023-08-05 21:00:52,011 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:52,026 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (0c147bc0da5a43a5a382f2ec20740b45).
> 2023-08-05 21:00:52,480 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (0c147bc0da5a43a5a382f2ec20740b45) to '
> http://localhost:8081'.
> 2023-08-05 21:00:55,809 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:55,830 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,481 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,484 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (d29904103fa3c83e3089c09f093372c9).
> 2023-08-05 21:07:52,728 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (d29904103fa3c83e3089c09f093372c9) to '
> http://localhost:8081'.
> 2023-08-05 21:07:55,972 WARN  org.apache.flink.table.client.cli.CliClient
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:211)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:231)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:532)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:423)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_231]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.Tabl

Re: Flink消费MySQL

2023-08-07 Thread Shammon FY
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
>
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
>
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
>
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
>
> @Override
> public void cancel() {
> }
> };
>
>
> |
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: Flink消费MySQL

2023-08-07 Thread Shammon FY
像上面提到的,目前可能直接使用CDC是一个比较好的方案,自己读数据会有很多问题,比如update数据如何读取、如何读取增量数据、如何处理failover等,还是直接使用CDC最方便

Best,
Shammon FY

On Tue, Aug 8, 2023 at 11:30 AM Jiabao Sun 
wrote:

> Hi,
>
> 可以尝试使用 flink-cdc-connectors 去实时关联。
> 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
> 被关联的表变化不大的话可以考虑 lookup join。
>
> Best,
> Jiabao
>
>
> > 2023年8月8日 上午11:10,小昌同学  写道:
> >
> > 谢谢老师指导呀;
> >
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> > 老师这一块有更好的建议嘛
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
> >  回复的原邮件 
> > | 发件人 | Shammon FY |
> > | 发送日期 | 2023年8月8日 10:37 |
> > | 收件人 |  |
> > | 主题 | Re: Flink消费MySQL |
> > Hi,
> >
> > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> >
> > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> >
> > Best,
> > Shammon FY
> >
> > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> >
> > 各位老师好
> >
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> > 以下是我的代码:
> > |
> > public class MysqlSource2 extends RichSourceFunction {
> > PreparedStatement ps;
> > private Connection connection;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> > connection = getConnection();
> > String sql="select * from actiontype;";
> > ps = connection.prepareStatement(sql);
> > }
> >
> > private static Connection getConnection(){
> > Connection con=null;
> > String driverClass= FlinkConfig.config.getProperty("driverClass");
> > String url=FlinkConfig.config.getProperty("jdbcUrl");
> > String user=FlinkConfig.config.getProperty("jdbcUser");
> > String passWord=FlinkConfig.config.getProperty("passWord");
> >
> > try {
> > Class.forName(driverClass);
> > con= DriverManager.getConnection(url,user,passWord);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > return con;
> > }
> >
> > @Override
> > public void run(SourceContext ctx) throws Exception {
> > ResultSet resultSet = ps.executeQuery();
> > while (resultSet.next()){
> > ActionType actionType = new ActionType(
> > resultSet.getString("action"),
> > resultSet.getString("action_name")
> > );
> > ctx.collect(actionType);
> > }
> > }
> >
> > @Override
> > public void close() throws Exception {
> > super.close();
> > if (null!=connection){
> > connection.close();
> > }
> > if (null!=ps){
> > ps.close();
> > }
> > }
> >
> > @Override
> > public void cancel() {
> > }
> > };
> >
> >
> > |
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
>
>


Re: 关于RichFlatMapFunction的状态输出

2023-08-13 Thread Shammon FY
Hi,

是想在RichFlatMapFunction结束计算前输出一些信息吗?AbstractRichFunction里有close方法,可以实现这个方法执行一些自定义的操作,这样在function
close之前会执行

Best,
Shammon FY

On Fri, Aug 11, 2023 at 10:59 AM Liu Join  wrote:

> 请问,flink1.17在使用RichFlatMapFunction进行批计算时,如何在数据结束时将状态写入输出的数据流中?
> 谢谢
>


Re: flink sql语句转成底层处理函数

2023-08-27 Thread Shammon FY
如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/

On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com> wrote:

> 请教下,是否可以去查询一个flink
> sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?
>
>
>


Re: Could not retrieve JobResults of globally-terminated jobs from JobResultStore

2023-08-31 Thread Shammon FY
Hi,

是流式作业还是批式作业无法恢复吗?从错误上看作业已经处于结束状态,你可以查看一下有没有其他错误日志,看看为什么作业失败退出了

Best,
Shammon FY

On Thu, Aug 31, 2023 at 7:47 PM denghaibin  wrote:

> flink-1.16.0任务运行一段时间后,大批量任务失败。错误日志如下。麻烦大佬看下是什么问题
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
>  at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_382]
>  at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_382]
>  ... 3 more
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>  at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
>  at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.16.0.jar:1.16.0]