Re: 在本地环境IDEA远程调试Flink报错

2021-07-09 Thread r pp
先编译正确后,再debug

tangzhi8...@gmail.com  于2021年6月28日周一 下午3:02写道:

> 目的:想在本地环境IDEA远程调试Flink
> 步骤:
> 1.这是Debug的配置项
> 2.报错堆栈信息:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'Streaming WordCount'.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:374)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:120)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:817)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:249)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1148)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1148)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'Streaming WordCount'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1984)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1845)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:357)
> ... 8 more
> Caused by: java.lang.RuntimeException: Error while waiting for job to be
> initialized
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:166)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:83)
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:146)
> ... 9 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.util.RestClientException: Response was
> neither of the expected type([simple type, class
> org.apache.flink.runtime.rest.messages.job.JobDetailsInfo]) nor an error.

Re: Running Flink Dataset jobs Sequentially

2021-07-09 Thread Ken Krugler
FWIW I had to do something similar in the past. My solution was to…

1. Create a custom reader that added the source directory to the input data (so 
I had a Tuple2
2. Create a job that reads from all source directories, using HadoopInputFormat 
for text
3. Constrain the parallelism of this initial part of the job, to avoid 
overwhelming downloads from S3.
4. Do a partition on the source directory
5. Write a custom mapPartition function that opens/writes to output files that 
are created with names based on the source directory.

— Ken

> On Jul 8, 2021, at 3:19 PM, Jason Liu  wrote:
> 
> Hi all,
> 
> We currently have a use case of running a given dataset API job for a 
> given S3 directory to dedup data and output to a new directory. We need to 
> run this job for roughly ~1000 S3 folders. I attempted to set up the Flink 
> executions so it runs sequentially like this: 
> 
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> Configuration parameters = new Configuration();
> parameters.setBoolean("recursive.file.enumeration", true);
> 
> for (final String inputDirectory : directoryList) {
>   String inputPath = inputDirectory;
>   String outputPath = getOutputPath(inputPath);
> 
>   log.warn("using input path [{}] and output path [{}]", inputPath, 
> outputPath);
> 
>   DataSet lines = 
> env.readTextFile(inputPath).withParameters(parameters);
>   DataSet deduped = lines.distinct(new GetKey());
>   deduped.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE);
> }
> env.execute();
> However, when I submit this job to the cluster, it generates a graph like 
> this 
> 
> And it seems Flink is running them in parallel. Is there a way to tell Flink 
> to run it sequentially? I tried moving the execution environment inside the 
> loop but it seems like it only runs the job on the first directory. I'm 
> running this on AWS Kinesis Data Analytics, so it's a bit hard for me to 
> submit new jobs. 
> 
> Wondering if there's any way I can accomplish this?
> 
> Thanks,
> Jason
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Maciej Bryński
Hi Adrian,
Could you share your state backend configuration ?

Regards,
Maciek

pt., 9 lip 2021 o 19:09 Adrian Bednarz  napisał(a):
>
> Hello,
>
> We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we 
> unexpectedly hit significant performance degradation when changing the state 
> backend to RocksDB.
>
> We performed tests with two tables: fact table TXN and dimension table 
> CUSTOMER with the following schemas:
>
> TXN:
>  |-- PROD_ID: BIGINT
>  |-- CUST_ID: BIGINT
>  |-- TYPE: BIGINT
>  |-- AMOUNT: BIGINT
>  |-- ITEMS: BIGINT
>  |-- TS: TIMESTAMP(3) **rowtime**
>  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>
> CUSTOMER:
>  |-- ID: BIGINT
>  |-- STATE: BIGINT
>  |-- AGE: BIGINT
>  |-- SCORE: DOUBLE
>  |-- PRIMARY KEY: ID
>
> And the following query:
> select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF t.ts 
> ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1' SECOND)
>
> In our catalog, we reconfigured the customer table so that the watermark is 
> set to infinity on that side of the join. We generate data in a round robin 
> fashion (except for timestamp that grows with a step of 1 ms).
>
> We performed our experiments on a single c5.4xlarge machine with heap and 
> managed memory size set to 12gb with a blackhole sink. With 2 000 000 fact 
> records and 100 000 dimension records, a job with heap backend finishes in 5 
> seconds whereas RocksDB executes in 1h 24m. For 400 000 dimension records it 
> doesn't grow significantly but goes up to 1h 36m (the job processes more 
> records after all).
>
> We also checked what would happen if we reduced the amount of customer ids to 
> 1. Our expectation was that RocksDB will not offload anything to disk anymore 
> so the performance should be comparable with heap backend. It was executed in 
> 10 minutes.
>
> Is this something anybody experienced or something to be expected? Of course, 
> we assumed RocksDB to perform slower but 300 eps is below our expectations.
>
> Thanks,
> Adrian



-- 
Maciek Bryński


Re: local运行模式下不会生成checkpoint吗?

2021-07-09 Thread Yun Tang
Hi

只要enable了checkpoint,一定会生成checkpoint的,这与你的运行模式无关。可以检查一下日志,看看JM端是否正常触发了checkpoint

祝好
唐云

From: casel.chen 
Sent: Tuesday, June 29, 2021 9:55
To: user-zh@flink.apache.org 
Subject: local运行模式下不会生成checkpoint吗?

我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb 
connector是自己开发的,实现了CheckpointedFunction接口,debug的时候发现数据进来的时候有调用invoke方法,但没有调用initialState和snapshotState方法,我有设置enableCheckpoint,同样的程序使用kubernetes部署发现是会调用snapshotState方法。我的问题是:local运行模式下不会生成checkpoint吗?


Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Adrian Bednarz
Hello,

We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we
unexpectedly hit significant performance degradation when changing the
state backend to RocksDB.

We performed tests with two tables: fact table TXN and dimension table
CUSTOMER with the following schemas:

TXN:
 |-- PROD_ID: BIGINT
 |-- CUST_ID: BIGINT
 |-- TYPE: BIGINT
 |-- AMOUNT: BIGINT
 |-- ITEMS: BIGINT
 |-- TS: TIMESTAMP(3) **rowtime**
 |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS

CUSTOMER:
 |-- ID: BIGINT
 |-- STATE: BIGINT
 |-- AGE: BIGINT
 |-- SCORE: DOUBLE
 |-- PRIMARY KEY: ID

And the following query:
select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF
t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1'
SECOND)

In our catalog, we reconfigured the customer table so that the watermark is
set to infinity on that side of the join. We generate data in a round robin
fashion (except for timestamp that grows with a step of 1 ms).

We performed our experiments on a single c5.4xlarge machine with heap and
managed memory size set to 12gb with a blackhole sink. With 2 000 000 fact
records and 100 000 dimension records, a job with heap backend finishes in
5 seconds whereas RocksDB executes in 1h 24m. For 400 000 dimension records
it doesn't grow significantly but goes up to 1h 36m (the job processes more
records after all).

We also checked what would happen if we reduced the amount of customer ids
to 1. Our expectation was that RocksDB will not offload anything to disk
anymore so the performance should be comparable with heap backend. It was
executed in 10 minutes.

Is this something anybody experienced or something to be expected? Of
course, we assumed RocksDB to perform slower but 300 eps is below our
expectations.

Thanks,
Adrian


flink on native k8s要如何动态改变日志配置?

2021-07-09 Thread casel.chen
flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger 
Level,以及用户可以传入自定义的日志模板,目前有办法做到么?

Re: Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread Caizhi Weng
Hi!

You can define your sink with the following schema:

CREATE TABLE kafka_sink (
  employee ROW
) WITH (
  'connector' = 'kafka',
  'format' = 'json'
  // other properties...
);

You can also insert into this sink with the following SQL:

INSERT INTO kafka_sink SELECT ROW(id, name) FROM kafka_source;

1095193...@qq.com <1095193...@qq.com> 于2021年7月9日周五 下午7:08写道:

> Hi community,
> I'll receive json message from Kafka, convert flat json to nested json and
> send it back to Kafka.
> receive message from Kafka: {“id”:"001","name":"wang"}
> send message back to Kafka:  {"employee":{“id”:"001","name":"wang"}}
> How to do it in Flink sql?
>
> --
> 1095193...@qq.com
>


Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread 1095193...@qq.com
Hi community,
I'll receive json message from Kafka, convert flat json to nested json and send 
it back to Kafka.
receive message from Kafka: {“id”:"001","name":"wang"}
send message back to Kafka:  {"employee":{“id”:"001","name":"wang"}} 
How to do it in Flink sql?



1095193...@qq.com


Kafka Consumer Retries Failing

2021-07-09 Thread Rahul Patwari
Hi,

We have a Flink 1.11.1 Version streaming pipeline in production which reads
from Kafka.
Kafka Server version is 2.5.0 - confluent 5.5.0
Kafka Client Version is 2.4.1 -
{"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
version: 2.4.1","method":""}

Occasionally(every 6 to 12 hours), we have observed that the Kafka
consumption rate went down(NOT 0) and the following logs were observed:
Generally, the consumption rate across all consumers is 4k records/sec.
When this issue occurred, the consumption rate dropped to < 50 records/sec

org.apache.kafka.common.errors.DisconnectException: null

{"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
fetch request (sessionId=405798138, epoch=5808) to node 8:
{}.","method":"handleError"}

org.apache.kafka.common.errors.TimeoutException: Failed

{"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator
100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid,
will attempt rediscovery","method":"markCoordinatorUnknown"}

{"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
coordinator 100.98.40.16:9092 (id: 2147483623 rack:
null)","method":"onSuccess"}

The consumers retried for more than an hour but the above logs are observed
again.
The consumers started pulling data after a manual restart.

No WARN or ERROR logs were observed in Kafka or Zookeeper during this
period.

Our observation from this incident is that Kafka Consumer retries could not
resolve the issue but a manual restart (or) Flink internal restart(Failure
rate restart policy) does.

Has anyone faced this issue before? Any pointers are appreciated.

Regards,
Rahul


自定义函数参数不能正确获取参数

2021-07-09 Thread Chenzhiyuan(HR)

我定义了一个kafka来源的table,sql查询时调了自定义函数, 但是发现参数不能被正确传递给自定义函数eval.
我用的flink版本是1.10.0.


l  json 的ddl如下:
private static final String personKafkaTable = "CREATE TABLE 
hw_person_normal_t(\n"
+ " data ARRAY>,\n"
+ " key STRING,\n"
+ " operation STRING\n"
+ ") with (\n"
+ "'connector.type' = 'kafka', \n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'HR_SALARY_FLINK_TEST',\n"
+ "'connector.properties.zookeeper.connect' = 'xxx',\n"
+ "'connector.properties.bootstrap.servers' = 'xxx',\n"
+ " 'connector.properties.group.id' = 'salaryGroup',\n"
+ " 'format.type' = 'json'\n"
+ ")";


l  sql查询中调用了自定义函数如下:

Table tempTable = tEnv.sqlQuery("select data from hw_person_normal_t")
.joinLateral("ParserJsonFunc(data) as (personNormalId, uuId, lastOrgId, 
lastDepartmentCode, operationType)")
.select("personNormalId, uuId, lastOrgId, lastDepartmentCode, 
operationType");


l  调试时发现自定义函数 eval 传递过来的value参数有7条,但是每条数据的都是空。


自定义function函数如下:

public class ParserJsonPersonNormalFunc extends TableFunction {

private static final Logger log = 
LoggerFactory.getLogger(ParserJsonPersonNormalFunc.class);

public void eval(Row[] value) {
try {
  log.info("eval start");
  collector.collect(Row.of(value));
} catch (Exception e) {
log.error("parser json failed :", e);
}
}

@Override
public TypeInformation getResultType() {
return Types.ROW(Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.STRING);
}
}







代码里注册了function:

tEnv.sqlUpdate(personKafkaTable);
tEnv.registerFunction("ParserJsonFunc", new ParserJsonPersonNormalFunc());





消息体格式如下:

{

"beforeData": [],

"byteSize": 272,

"columnNumber": 32,

"data": [{

"byteSize": 8,

"columnName": "APPLY_PERSON_ID",

"rawData": 10017,

"type": "LONG"

}, {

"byteSize": 12,

"columnName": "UPDATE_SALARY",

"rawData": "11000.00",

"type": "DOUBLE"

}, {

"byteSize": 11,

"columnName": "UP_AMOUNT",

"rawData": "1000.00",

"type": "DOUBLE"

}, {

"byteSize": 3,

"columnName": "CURRENCY",

"rawData": "CNY",

"type": "STRING"

}, {

"byteSize": 32,

"columnName": "EXCHANGE_RATE",

"rawData": "1.00",

"type": "DOUBLE"

},  {

"byteSize": 11,

"columnName": "DEDUCTED_ACCOUNT",

"rawData": "1000.00",

"type": "DOUBLE"

}, {

"byteSize": 1,

"columnName": "ENTER_AT_PROCESS",

"rawData": "Y",

"type": "STRING"

}],

"dataCount": 0,

"dataMetaData": {

"connector": "mysql",

"pos": 1000368076,

"row": 0,

"ts_ms": 1625565737000,

"snapshot": "false",

"db": "testdb",

"table": "flow_person_t"

},

"key": "APPLY_PERSON_ID",

"memorySize": 1120,

"operation": "insert",

"rowIndex": -1,

"timestamp": "1970-01-01 00:00:00"

}



Re: Job Recovery Time on TM Lost

2021-07-09 Thread Till Rohrmann
Gen is right with his explanation why the dead TM discovery can be faster
with Flink < 1.12.

Concerning flaky TaskManager connections:

2.1 I think the problem is that the receiving TM does not know the
container ID of the sending TM. It only knows its address. But this is
something one could improve by sending this information along with the
connection information. This could improve debugging.
2.3 The idea would be to establish a reconnection mechanism between the
TMs. This hasn't been done yet, though. The non-trivial part is probably
that we need to introduce an ack-protocol and keep a window of sent events
on the sender side in order to resend events in case of lost packages.

Cheers,
Till

On Fri, Jul 9, 2021 at 4:38 AM Gen Luo  wrote:

> @刘建刚
> Welcome to join the discuss and thanks for sharing your experience.
>
> I have a minor question. In my experience, network failures in a certain
> cluster usually takes a time to recovery, which can be measured as p99 to
> guide configuring. So I suppose it would be better to use time than attempt
> count as the configuration for confirming TM liveness. How do you think
> about this? Or is the premise right according to your experience?
>
> @Lu Niu 
> > Does that mean the akka timeout situation we talked above doesn't apply
> to flink 1.11?
>
> I suppose it's true. According to the reply from Till in FLINK-23216
> , it should be
> confirmed that the problem is introduced by declarative resource
> management, which is introduced to Flink in 1.12.
>
> In previous versions, although JM still uses heartbeat to check TMs
> status, RM will tell JM about TM lost once it is noticed by Yarn. This is
> much faster than JM's heartbeat mechanism, if one uses default heartbeat
> configurations. However, after 1.12 with declarative resource management,
> RM will no longer tell this to JM, since it doesn't have a related
> AllocationID.  So the heartbeat mechanism becomes the only way JM can know
> about TM lost.
>
> On Fri, Jul 9, 2021 at 6:34 AM Lu Niu  wrote:
>
>> Thanks everyone! This is a great discussion!
>>
>> 1. Restarting takes 30s when throwing exceptions from application code
>> because the restart delay is 30s in config. Before lots of related config
>> are 30s which lead to the confusion. I redo the test with config:
>>
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
>> backoffTimeMS=1000)
>> heartbeat.timeout: 50
>> akka.ask.timeout 30 s
>> akka.lookup.timeout 30 s
>> akka.tcp.timeout 30 s
>> akka.watch.heartbeat.interval 30 s
>> akka.watch.heartbeat.pause 120 s
>>
>>Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
>> restart takes 14s. Does that mean the akka timeout situation we talked
>> above doesn't apply to flink 1.11?
>>
>> 2. About flaky connection between TMs, we did notice sometimes exception
>> as follows:
>> ```
>> TaskFoo switched from RUNNING to FAILED on
>> container_e02_1599158147594_156068_01_38 @
>> xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
>> (dataPort=40957).
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
>> This might indicate that the remote task manager was lost.
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
>> at
>>