Flink CDC2.2.1 设置server id范围

2022-10-31 Thread Fei Han
大家好!
现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC 
打宽表。但是在任务跑一段时间后,还是出现如下报错:
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with 
the same server_uuid/server_id as this slave has connected to the master;
请教下各位,还有什么解决方案没有


Re: Flink CDC2.2.1 设置server id范围

2022-10-31 Thread Leonard Xu
Hi, 

你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’.
另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。


Best,
Leonard


> 2022年10月31日 下午4:00,Fei Han  写道:
> 
> 大家好!
> 现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC 
> 打宽表。但是在任务跑一段时间后,还是出现如下报错:
> Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave 
> with the same server_uuid/server_id as this slave has connected to the master;
> 请教下各位,还有什么解决方案没有



Re: Re: Re: upsert kafka作为source时,消费不到kafka中的数据

2022-10-31 Thread guozhi mang
图上看不出有什么异常,可以看一下 task manager 日志

Best regards


左岩 <13520871...@163.com> 于2022年10月31日周一 11:34写道:

>
>
> 还是没有消费到,麻烦查看附件中的图片
>
>
>
>
>
> 在 2022-10-31 10:03:05,"guozhi mang"  写道:
> >我想你的格式错了
> >下面我修改了一下
> >tenv.executeSql(
> >" create table t_upsert_kafka( "
> >+ "userid int ,"
> >+ "username string,  "
> >+ "age int, "
> >+ "`partition` int ,"
> >+ "  PRIMARY KEY (userid) NOT ENFORCED "
> >+ " ) with ("
> >+ "  'connector' = 'upsert-kafka',  "
> >+ "  'topic' = 'test02',"
> >+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
> >+ "  'key.format' = 'json', "
> >+ "  'value.format' = 'json'"
> >+ " )  "
> >);
> >
> >*下面是官方案例*
> >
> >CREATE TABLE pageviews_per_region (
> >  user_region STRING,
> >  pv BIGINT,
> >  uv BIGINT,
> >  PRIMARY KEY (user_region) NOT ENFORCED) WITH (
> >  'connector' = 'upsert-kafka',
> >  'topic' = 'pageviews_per_region',
> >  'properties.bootstrap.servers' = '...',
> >  'key.format' = 'avro',
> >  'value.format' = 'avro');
> >
> >
> >左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道:
> >
> >>
> >>
> >>
> >>
> >> public static void main(String[] args) throws Exception {
> >> Configuration conf = new Configuration();
> >> conf.setInteger("rest.port", 10041);
> >> StreamExecutionEnvironment env =
> >> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> >> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> >> //env.setParallelism(1);
> >>
> >> env.enableCheckpointing(3000);
> >> env.setStateBackend(new HashMapStateBackend());
> >> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
> >>
> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> >>
> >> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >> // 创建目标 kafka映射表
> >> tenv.executeSql(
> >> " create table t_upsert_kafka( "
> >> + "userid int primary key not enforced,"
> >> + "username string,  "
> >> + "age int, "
> >> + "`partition` int "
> >> + " ) with ("
> >> + "  'connector' = 'upsert-kafka',  "
> >> + "  'topic' = 'test02',"
> >> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
> >> + "  'key.format' = 'json', "
> >> + "  'value.format' = 'json'"
> >> + " )  "
> >> );
> >>
> >> tenv.executeSql("select * from t_upsert_kafka").print();
> >>
> >> tenv.executeSql(
> >> " CREATE TABLE t_kafka_connector (   "
> >> + "userid int ,"
> >> + "username string,  "
> >> + "age int, "
> >> + "`partition` int "
> >> + " ) WITH (   "
> >> + "  'connector' = 'kafka',"
> >> + "  'topic' = 'test02', "
> >> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
> >> + "  'properties.group.id' = 'testGroup1',  "
> >> + "  'scan.startup.mode' = 'earliest-offset',   "
> >> + "  'format'='json'   "
> >> + " )   "
> >>
> >> );
> >>
> >> tenv.executeSql("select * from t_kafka_connector").print();
> >>
> >> env.execute();
> >>
> >>
> >>
> >>
> >>
> >> t_upsert_kafka 消费不到   t_kafka_connector可以消费到
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2022-10-31 09:43:49,"Shengkai Fang"  写道:
> >> >hi,
> >> >
> >> >看不到的图片。能不能直接展示文字或者用图床工具?
> >> >
> >> >Best,
> >> >Shengkai
> >> >
> >> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
> >> >
> >> >> upsert kafka作为source时,消费不到kafka中的数据
> >> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
> >> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
> >> >>
> >>
>
>


Re:Re: Flink CDC2.2.1 设置server id范围

2022-10-31 Thread casel.chen



server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度?











在 2022-10-31 16:04:32,"Leonard Xu"  写道:
>Hi, 
>
>你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’.
>另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。
>
>
>Best,
>Leonard
>
>
>> 2022年10月31日 下午4:00,Fei Han  写道:
>> 
>> 大家好!
>> 现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC 
>> 打宽表。但是在任务跑一段时间后,还是出现如下报错:
>> Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave 
>> with the same server_uuid/server_id as this slave has connected to the 
>> master;
>> 请教下各位,还有什么解决方案没有
>


flink sql client取消sql-clients-default.yaml后那些预置catalogs建议在哪里定义呢?

2022-10-31 Thread casel.chen
flink新版本已经找不到sql-clients-default.yaml文件了,那么之前配置的那些预置catalogs建议在哪里定义呢?通过初始化sql么?

Re: flink sql client取消sql-clients-default.yaml后那些预置catalogs建议在哪里定义呢?

2022-10-31 Thread Leonard Xu
Hi,

我记得有个-i 参数可以指定初始化sql文件,你贴你的初始化sql在文件里加进去就可以了。

祝好,
Leonard




> 2022年10月31日 下午4:52,casel.chen  写道:
> 
> flink新版本已经找不到sql-clients-default.yaml文件了,那么之前配置的那些预置catalogs建议在哪里定义呢?通过初始化sql么?



Re: Re: Flink CDC2.2.1 设置server id范围

2022-10-31 Thread 林影
Hi, Leonard.

我也有类似的疑惑。

有个线上的Flink Application之前配置的serverid 是
6416-6418,并行度之前是3,后来缩容的时候并行度改成2了,在这种场景下serverid的范围需要进行调整吗?


casel.chen  于2022年10月31日周一 16:50写道:

>
>
>
>
> server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度?
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-10-31 16:04:32,"Leonard Xu"  写道:
> >Hi,
> >
>
> >你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’.
> >另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。
> >
> >
> >Best,
> >Leonard
> >
> >
> >> 2022年10月31日 下午4:00,Fei Han  写道:
> >>
> >> 大家好!
> >> 现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC
> 打宽表。但是在任务跑一段时间后,还是出现如下报错:
> >> Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A
> slave with the same server_uuid/server_id as this slave has connected to
> the master;
> >> 请教下各位,还有什么解决方案没有
> >
>


Re: Flink CDC2.2.1 设置server id范围

2022-10-31 Thread Leonard Xu

> server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度?

作业起来后修改并发是需要调整的,建议这块可以放到平台里去设计,这样可以让写sql的用户知道with参数里参数的作用。

祝好,
Leonard


> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2022-10-31 16:04:32,"Leonard Xu"  写道:
>> Hi, 
>> 
>> 你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’.
>> 另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。
>> 
>> 
>> Best,
>> Leonard
>> 
>> 
>>> 2022年10月31日 下午4:00,Fei Han  写道:
>>> 
>>> 大家好!
>>> 现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC 
>>> 打宽表。但是在任务跑一段时间后,还是出现如下报错:
>>> Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave 
>>> with the same server_uuid/server_id as this slave has connected to the 
>>> master;
>>> 请教下各位,还有什么解决方案没有
>> 



Re: Flink CDC2.2.1 设置server id范围

2022-10-31 Thread Leonard Xu

> 2022年10月31日 下午4:57,林影  写道:
> 
> Hi, Leonard.
> 
> 我也有类似的疑惑。
> 
> 有个线上的Flink Application之前配置的serverid 是
> 6416-6418,并行度之前是3,后来缩容的时候并行度改成2了,在这种场景下serverid的范围需要进行调整吗?

缩容并不需要的,你的case里只会用6416 和 6417这两个id,只有扩容需要考虑,并且扩容时如果没有夸大范围,目前是会报错提示的。

祝好,
Leonard




> 
> casel.chen  于2022年10月31日周一 16:50写道:
> 
>> 
>> 
>> 
>> 
>> server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2022-10-31 16:04:32,"Leonard Xu"  写道:
>>> Hi,
>>> 
>> 
>>> 你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’.
>>> 另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。
>>> 
>>> 
>>> Best,
>>> Leonard
>>> 
>>> 
 2022年10月31日 下午4:00,Fei Han  写道:
 
 大家好!
 现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC
>> 打宽表。但是在任务跑一段时间后,还是出现如下报错:
 Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A
>> slave with the same server_uuid/server_id as this slave has connected to
>> the master;
 请教下各位,还有什么解决方案没有
>>> 
>> 



Re: Flink CDC2.2.1 设置server id范围

2022-10-31 Thread 林影
ok, thx!

Leonard Xu  于2022年10月31日周一 17:01写道:

>
> > 2022年10月31日 下午4:57,林影  写道:
> >
> > Hi, Leonard.
> >
> > 我也有类似的疑惑。
> >
> > 有个线上的Flink Application之前配置的serverid 是
> > 6416-6418,并行度之前是3,后来缩容的时候并行度改成2了,在这种场景下serverid的范围需要进行调整吗?
>
> 缩容并不需要的,你的case里只会用6416 和 6417这两个id,只有扩容需要考虑,并且扩容时如果没有夸大范围,目前是会报错提示的。
>
> 祝好,
> Leonard
>
>
>
>
> >
> > casel.chen  于2022年10月31日周一 16:50写道:
> >
> >>
> >>
> >>
> >>
> >>
> server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2022-10-31 16:04:32,"Leonard Xu"  写道:
> >>> Hi,
> >>>
> >>
> >>>
> 你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’.
> >>> 另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。
> >>>
> >>>
> >>> Best,
> >>> Leonard
> >>>
> >>>
>  2022年10月31日 下午4:00,Fei Han  写道:
> 
>  大家好!
>  现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC
> >> 打宽表。但是在任务跑一段时间后,还是出现如下报错:
>  Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A
> >> slave with the same server_uuid/server_id as this slave has connected to
> >> the master;
>  请教下各位,还有什么解决方案没有
> >>>
> >>
>
>


Re: flink sql client取消sql-clients-default.yaml后那些预置catalogs建议在哪里定义呢?

2022-10-31 Thread yu zelin
Hi,
Leonard 提到的 -i 参数可以满足你的需求。在初始化SQL文件中可以SET/RESET属性,CREATE/DROP等。
更多信息请查看:  
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sqlclient/#sql-client-startup-options
 


祝好,
yuzelin

> 2022年10月31日 16:57,Leonard Xu  写道:
> 
> Hi,
> 
> 我记得有个-i 参数可以指定初始化sql文件,你贴你的初始化sql在文件里加进去就可以了。
> 
> 祝好,
> Leonard
> 
> 
> 
> 
>> 2022年10月31日 下午4:52,casel.chen  写道:
>> 
>> flink新版本已经找不到sql-clients-default.yaml文件了,那么之前配置的那些预置catalogs建议在哪里定义呢?通过初始化sql么?
> 



Re: 关于LocalTransportException的优化方向咨询

2022-10-31 Thread yidan zhao
嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
failed;这种异常的概率。
问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。

weijie guo  于2022年10月31日周一 12:54写道:
>
> 你好,请问使用的flink版本是多少?
> 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> restore慢等
>
> Best regards,
>
> Weijie
>
>
> yidan zhao  于2022年10月30日周日 11:36写道:
>
> > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > .network.netty.exception.LocalTransportException
> > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > (1)Sending the partition request to '...' failed;
> > org.apache.flink.runtime.io
> > .network.netty.exception.LocalTransportException:
> > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > at org.apache.flink.runtime.io
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > at org.apache.flink.runtime.io
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > at
> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by:
> > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > ChannelPromise)(Unknown Source)
> > Caused by:
> > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > writeAddress(..) failed: Connection timed out
> >
> > (2)readAddress(..) failed: Connection timed out
> > org.apache.flink.runtime.io
> > .network.netty.exception.LocalTransportException:
> > readAddress(..) failed: Connection timed out (connection to
> > '10.35.109.149/10.35.109.149:2094')
> > at org.apache.flink.runtime.io
> > .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > at
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(Abstr

Re:Re: Re: 关于如何得到管道中哪些源是有界和无界的问题

2022-10-31 Thread Guojun Li



或许可以考虑在设计平台时将流任务和批任务做成不同的入口。


Best,
Guojun





在 2022-10-28 18:14:33,"junjie.m...@goupwith.com"  写道:
>这就是写代码和平台化的需求不同了,对于平台化需要能判断出写的sql代码块最终生成的管道是有界还是无界,对于有界需要对外提供restful 
>api由外部调度系统定时调起,而无界管道可以直接运行长期保持。
>当然还有很多场景下需要知道管道的有界和无界,这里我不一一例举了。
>
> 
>发件人: weijie guo
>发送时间: 2022-10-28 18:01
>收件人: user-zh
>主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>那应该没有办法在Table API中拿到了,我有一些不理解,为什么你需要自动判断执行模式,而不是根据你的任务的实际情况来设置。
>如果你期望以批的模式跑作业,然后有些Source是无界的,我理解这本身就是采用的source不合理,应该修改代码。
>另外流和批执行模式有很多不同之处,例如sum算子对于每个key是输出多条还是一条,这都是你选择执行模式的时候需要考量的。假设可以支持自动推断,让系统自动推断也可能出现很多预期之外的行为。
> 
>Best regards,
> 
>Weijie
> 
> 
>junjie.m...@goupwith.com  于2022年10月28日周五 17:51写道:
> 
>>
>> 我是flink1.14.5
>>
>> EnvironmentSettings.fromConfiguration(ReadableConfig configuration) {
>> final Builder builder = new Builder();
>> switch (configuration.get(RUNTIME_MODE)) {
>> case STREAMING:
>> builder.inStreamingMode();
>> break;
>> case BATCH:
>> builder.inBatchMode();
>> break;
>> case AUTOMATIC:
>> default:
>> throw new TableException(
>> String.format(
>> "Unsupported mode '%s' for '%s'. "
>> + "Only an explicit BATCH or STREAMING
>> mode is supported in Table API.",
>> configuration.get(RUNTIME_MODE),
>> RUNTIME_MODE.key()));
>> }限制了不支持AUTOMATIC
>>
>>
>> 发件人: TonyChen
>> 发送时间: 2022-10-28 17:13
>> 收件人: user-zh
>> 主题: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> 升个小版本,1.14.3就有AUTOMATIC
>>
>>
>> Best,
>> TonyChen
>>
>> > 2022年10月28日 17:09,junjie.m...@goupwith.com 写道:
>> >
>> > hi,weijie:
>> > 我使用的是flink1.14里是不支持设置execution.runtime-mode=AUTOMATIC的,会报如下错误:
>> > org.apache.flink.table.api.TableException: Unsupported mode 'AUTOMATIC'
>> for 'execution.runtime-mode'. Only an explicit BATCH or STREAMING mode is
>> supported in Table API.
>> >
>> > 是后续版本已经支持execution.runtime-mode=AUTOMATIC了吗?
>> >
>> >
>> > 发件人: weijie guo
>> > 发送时间: 2022-10-28 16:38
>> > 收件人: user-zh
>> > 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> > 这种需求可以吧execution.runtime-mode设置成AUTOMATIC,
>> > 这种模式就是你的需求,如果所有的Source都是有界的,则使用Batch执行模式,否则使用Streaming
>> >
>> > Best regards,
>> >
>> > Weijie
>> >
>> >
>> > junjie.m...@goupwith.com  于2022年10月28日周五
>> 15:56写道:
>> >
>> >> hi, Weijie:
>> >>
>> >>
>> 由于我没有发现如何判断管道是有界还是无界的方法,特别是对于Table&SQL需要手动指定execution.runtime-mode=BATCH或STREAMING。
>> >>
>> >>
>> 我想通过得到所有的source的有界/无界来判断整个管道是有界/无界的,如果所有scnSource都是有界的则管道必定是有界管道,否则管道就是无界管道。
>> >>
>> >>
>> >> 发件人: weijie guo
>> >> 发送时间: 2022-10-28 15:44
>> >> 收件人: user-zh
>> >> 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> >> Hi, junjie:
>> >>
>> >> 我想先了解一下你的目的是什么,为什么需要在Table API中判断Source的Boundness,这些信息对你的场景的帮助是什么?
>> >>
>> >> Best regards,
>> >>
>> >> Weijie
>> >>
>> >>
>> >> junjie.m...@goupwith.com  于2022年10月28日周五
>> >> 15:36写道:
>> >>
>> >>> public static DynamicTableSource
>> FactoryUtil.createTableSource(@Nullable
>> >>> Catalog catalog,ObjectIdentifier objectIdentifier, ResolvedCatalogTable
>> >>> catalogTable, ReadableConfig configuration, ClassLoader classLoader,
>> >>> boolean isTemporary)可以得到但是需要传入的参数太多了,这些参数不知道如何获取
>> >>>
>> >>> 发件人: junjie.m...@goupwith.com
>> >>> 发送时间: 2022-10-28 15:33
>> >>> 收件人: user-zh
>> >>> 主题: Re: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> >>> 问题是如何拿到ScanTableSource呢?目前我没发现哪里可以得到
>> >>>
>> >>>
>> >>> 发件人: TonyChen
>> >>> 发送时间: 2022-10-28 15:21
>> >>> 收件人: user-zh
>> >>> 主题: Re: 关于如何得到管道中哪些源是有界和无界的问题
>> >>> 是不是可以看下这个
>> >>>
>> >>>
>> >>
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java#L94
>> >>> Best,
>> >>> TonyChen
>>  2022年10月28日 15:12,junjie.m...@goupwith.com 写道:
>> 
>>  大家好:
>>    有个问题判断Table是有界还是无界,或者通过TableEnv如何能得到source table信息和其有界还是无界。
>> 
>> >>>
>> >>
>>
>>


Re: 关于LocalTransportException的优化方向咨询

2022-10-31 Thread weijie guo
1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
2.指的是作业jar,每个TM只会下载一次

Best regards,

Weijie


yidan zhao  于2022年10月31日周一 19:54写道:

> 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> failed;这种异常的概率。
> 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
>
> weijie guo  于2022年10月31日周一 12:54写道:
> >
> > 你好,请问使用的flink版本是多少?
> > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> >
> 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > restore慢等
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > yidan zhao  于2022年10月30日周日 11:36写道:
> >
> > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > .network.netty.exception.LocalTransportException
> > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > (1)Sending the partition request to '...' failed;
> > > org.apache.flink.runtime.io
> > > .network.netty.exception.LocalTransportException:
> > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > at org.apache.flink.runtime.io
> > >
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > at org.apache.flink.runtime.io
> > >
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by:
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > > ChannelPromise)(Unknown Source)
> > > Caused by:
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > writeAddress(..) failed: Connection timed out
> > >
> > > (2)readAddress(..) failed: Connection timed out
> > > org.apache.flink.runtime.io
> > > .network.netty.exception.LocalTransportException:
> > > readAddress(..) failed: Connection timed out (connection to
> > > '10.35.109.149/10.35.109.149:2094')
> > > at org.apache.flink.runtime.io
> > >
> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> 

Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

2022-10-31 Thread 汪赟
退订

发自我的 iPhone

> 在 2022年10月28日,11:41,Weihua Hu  写道:
> 
> Hi, Young
> 
> 你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster
> 通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod
> 上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。
> 
> 也许你可以创建一个 jira issue 来跟进这个问题
> 
> Best,
> Weihua
> 
> 
>> On Thu, Oct 27, 2022 at 6:51 PM Young Chen  wrote:
>> 
>> 【问题描述】
>> 
>> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager),
>> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。
>> 
>> 可以看到容器中如下error日志。
>> 
>> 
>> 
>> 【操作步骤】
>> 
>> 部署Cluster
>> 
>> 
>> 
>> apiVersion: flink.apache.org/v1beta1
>> 
>> kind: FlinkDeployment
>> 
>> metadata:
>> 
>>  name: flink-cluster-1jm-checkpoint
>> 
>> spec:
>> 
>>  image: flink:1.15
>> 
>>  flinkVersion: v1_15
>> 
>>  flinkConfiguration:
>> 
>>taskmanager.numberOfTaskSlots: "1"
>> 
>>state.savepoints.dir:
>> file:///flink-data/savepoints
>> 
>>state.checkpoints.dir:
>> file:///flink-data/checkpoints
>> 
>>high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> 
>>high-availability.storageDir:
>> file:///flink-data/ha
>> 
>>state.checkpoints.num-retained: "10"
>> 
>>  serviceAccount: flink
>> 
>>  ingress:
>> 
>>template: "{{name}}.{{namespace}}.k8s.rf.io"
>> 
>>  jobManager:
>> 
>>replicas: 2
>> 
>>  podTemplate:
>> 
>>spec:
>> 
>>  nodeSelector:
>> 
>>kubernetes.io/hostname: k8s17
>> 
>>  containers:
>> 
>>- name: flink-main-container
>> 
>>  volumeMounts:
>> 
>>- mountPath: /flink-data
>> 
>>  name: flink-volume
>> 
>>  volumes:
>> 
>>- name: flink-volume
>> 
>>  hostPath:
>> 
>># directory location on host
>> 
>>path: /tmp/flink
>> 
>># this field is optional
>> 
>>type: Directory
>> 
>> 
>> 
>> 部署job:
>> 
>> 
>> 
>> apiVersion: flink.apache.org/v1beta1
>> 
>> kind: FlinkSessionJob
>> 
>> metadata:
>> 
>>  name: flink-job-1jm-checkpoint
>> 
>> spec:
>> 
>>  deploymentName: flink-cluster-1jm-checkpoint
>> 
>>  job:
>> 
>>jarURI:
>> file:///opt/flink/examples/streaming/StateMachineExample.jar
>> # 自己打的operator镜像包含了examples的jar
>> 
>>entryClass:
>> org.apache.flink.streaming.examples.statemachine.StateMachineExample
>> 
>>parallelism: 1
>> 
>>upgradeMode: savepoint
>> 
>> 
>> 
>> 
>> 
>> 【相关日志】
>> 
>>  1.  job部署成功可以运行的一次,operator日志:
>> 
>> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService
>> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar:
>> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.
>> 
>> java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.rest.util.RestClientException:
>> [org.apache.flink.runtime.rest.handler.RestHandlerException: File
>> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist
>> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)
>> 
>> at
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>> 
>> at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> Source)
>> 
>> at java.base/java.lang.Thread.run(Unknown Source)
>> 
>> ]
>> 
>> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown
>> Source
>> 
>> 一个JobManager
>> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。
>> 
>> 
>> 
>> 
>> 
>>  1.  job部署失败operator日志:
>> 
>> 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher
>> [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing
>> ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint',
>> namespace='flink'}, version: 120505701} failed.
>> 
>> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
>> org.apache.flink.util.FlinkRuntimeException:
>> java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
>> error., > 
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file
>> /tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar
>> does not exist
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarH

【SQL Gateway - HiveServer2】UnsupportedOperationException: Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.

2022-10-31 Thread QiZhu Chan
Hi team,
  I had starting the SQL Gateway with the HiveServer2 Endpoint, and then I 
submit SQL with Apache Hive Beeline, but I get the following exception:


java.lang.UnsupportedOperationException: Unrecognized TGetInfoType value: 
CLI_ODBC_KEYWORDS.
at 
org.apache.flink.table.endpoint.hive.HiveServer2Endpoint.GetInfo(HiveServer2Endpoint.java:371)
 [flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1537)
 [flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1522)
 [flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) 
[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) 
[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
 [flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
2022-11-01 13:55:33,885 ERROR org.apache.thrift.server.TThreadPoolServer
   [] - Thrift error occurred during processing of message.
org.apache.thrift.protocol.TProtocolException: Required field 'infoValue' is 
unset! Struct:TGetInfoResp(status:TStatus(statusCode:ERROR_STATUS, 
infoMessages:[*java.lang.UnsupportedOperationException:Unrecognized 
TGetInfoType value: CLI_ODBC_KEYWORDS.:9:8, 
org.apache.flink.table.endpoint.hive.HiveServer2Endpoint:GetInfo:HiveServer2Endpoint.java:371,
 
org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1537,
 
org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1522,
 org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, 
org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286,
 
java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1128, 
java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:628, 
java.lang.Thread:run:Thread.java:834], errorMessage:Unrecognized TGetInfoType 
value: CLI_ODBC_KEYWORDS.), infoValue:null)
at 
org.apache.hive.service.rpc.thrift.TGetInfoResp.validate(TGetInfoResp.java:379) 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.validate(TCLIService.java:5228)
 ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5285)
 ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5254)
 ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.write(TCLIService.java:5205)
 ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:53) 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
 [flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
2022-11-01 13:55:33,886 WARN  org.apache.thrift.transport.TIOStreamTransport
   [] - Error closing output stream.
java.net.SocketException: Socket closed
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) ~[?:?]
at java.net.SocketOutputStream.write(SocketOutputStream.java:150) ~[?:?]
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) ~[?:?]
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142) ~[?:?]
at java.io.FilterOutputStream.close(FilterOutputStream.java:182) ~[?:?]
at 
org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110)
 [flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at org.apache.thrift.transport.TSocket.close(TSocket.java:235) 
[flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:303)
 [flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar:1.16.0]
at 
java.util.conc