Re: flink dataStream多次sink DAG重复驱动执行?
个人理解是不会重复驱动执行的,具体你可以测试一下,但是从底层原理上讲,我也讲不了。 发件人: lp 发送时间: 2021-03-05 17:31 收件人: user-zh 主题: flink dataStream多次sink DAG重复驱动执行? 有个疑问, 如下程序片段: -- Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr); properties.setProperty("group.id",kafkaOdsGroup); properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset); properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,kafkaOdsPartitionDiscoverInterval); properties.setProperty("transaction.timeout.ms",KafkaOdsTransactionTimeout);//kafka事务超时时间 FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(kafkaOdsTopic,new SimpleStringSchema(),properties); DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer); dataStreamSource.printToErr("1"); dataStreamSource.printToErr("2"); dataStreamSource.printToErr("3"); 我对一个datastream进行多次相同操作的sink,请问是否会导致上游整个DAG重复驱动执行,基于spark的惯性思维,我认为上游DAG是会重复驱动执行的? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 退订
Hi, 退订请发邮件到 user-zh-unsubscr...@flink.apache.org 详细的可以参考 [1] [1] https://flink.apache.org/zh/community.html#section-1 发件人: zenglong chen 发送时间: 2021-03-08 10:00 收件人: user-zh 主题: 退订 退订
Re: Re: 社区有人实现过Flink的MongodbSource吗?
好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取 发件人: Paul Lam 发送时间: 2021-02-24 17:03 收件人: user-zh 主题: Re: 社区有人实现过Flink的MongodbSource吗? Hi, Debezium 支持 MongoDB CDC[1],可以了解下。 [1] https://debezium.io/documentation/reference/connectors/mongodb.html Best, Paul Lam > 2021年2月24日 16:23,Evan 写道: > > > 有人完整的实现Flink的MongodbSource吗 > 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化 > >
社区有人实现过Flink的MongodbSource吗?
有人完整的实现Flink的MongodbSource吗 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
????: Pyflink????kerberos??????Kafka??????????
?? ?? ?? ?? 2021-01-30 17:53 user-zh ?? Pyflinkkerberos??Kafka?? ?? ??pyflinkkafka?? flink-conf.yamlkerberos jaas.conf?? ??
Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
你好,可以获取 CREATE TABLE KafkaTable ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); 可以查阅官网得到你想要的信息: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#available-metadata 希望能帮助到你。 发件人: gimlee 发送时间: 2021-01-21 11:20 收件人: user-zh 主题: Flink SQL kafka connector有办法获取到partition、offset信息嘛? 如题,需要获取到kafka的partition、offset进行处理 -- Sent from: http://apache-flink.147419.n8.nabble.com/
????: flink sql hop????????udaf????????????merge??????????????????????
?? ?? bigdata ?? 2021-01-18 14:52 user-zh ?? flink sql hopudafmerge?? ?? flink1.10.1 sql??hop??udaf??merge??merge?? org.apache.flink.table.planner.codegen.CodeGenException: No matching merge method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF' merge ?? 1??ImperativeAggCodeGencheckNeededMethods??if (needMerge) getUserDefinedMethod 2??UserDefinedFunctionUtilsgetUserDefinedMethod??merge??mergefalse,??flink??bug parameterClassEquals(methodSignature(i), clazz) || parameterDataTypeEquals(internalTypes(i), dataTypes(i)) ?? def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = { its.foreach(i => accumulator ++ i) }
????: flink sql hop????udaf????
merge??marge Evan Cheng 2021??1??18??09:00:07 bigdata ?? 2021-01-17 22:31 user-zh ?? flink sql hopudaf ?? flink1.10.1sql hop??udafmarge?? org.apache.flink.table.api.ValidationException: Function class 'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static UDAF?? class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{ // val percentile1 = 0.5 val percentile2 = 0.75 val percentile3 = 0.98 val percentile4 = 0.99 /** * * @param accumulator ?? * @return ?? */ override def getValue(accumulator: ListBuffer[Float]): String = { // val length = accumulator.size var i1 = Math.round(length*percentile1).toInt if(i1==0) i1 = 1 var i2 = Math.round(length*percentile2).toInt if(i2==0) i2 = 1 var i3 = Math.round(length*percentile3).toInt if(i3==0) i3 = 1 var i4 = Math.round(length*percentile4).toInt if(i4==0) i4 = 1 val seq = accumulator.sorted // seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt } override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]() def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = { accumulator.append(i) } def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = { its.foreach(i => accumulator ++ i) }
Re: 回复: flink sql读kafka元数据问题
我知道 酷酷同学 啥意思,kafka 消息是key,value形式,当然这个key默认是null,他想在 select 语句里将 kafka的key值读出来对吧。 我也在文档里找了,确实是没有相关文档说明 发件人: 酷酷的浑蛋 发送时间: 2021-01-15 16:35 收件人: user-zh@flink.apache.org 主题: 回复: flink sql读kafka元数据问题 直接读topic,headers是空,我仅仅是想读key,不管topic是谁写入的 在2021年01月14日 16:03,酷酷的浑蛋 写道: 你意思是说,topic不是flink写入的,用flink sql就不能读到key? 在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道: hi 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11使用createStatementSet报错 No operators defined in streaming topology
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句 发件人: datayangl 发送时间: 2021-01-14 16:13 收件人: user-zh 主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology flink版本: 1.11 使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka 代码如下: def main(args: Array[String]): Unit = { FlinkUtils.initTable() val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv streamEnv.disableOperatorChaining() streamEnv.setParallelism(1) streamEnv.setMaxParallelism(1) CheckPointUtils.setCheckPoint(streamEnv, 12, 6) dealWithOdsDataTohive(tableEnv) val sqls:Map[String,String] = ConfigItem.ODS_SQL val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE", null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet val filledAllSqlsTable = sqls.map(x=>{ val hiveMapTopic = hiveTableMapTopic val topicName = hiveMapTopic.getOrElse(x._1,null) val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else null (x._1,topic,x._2) }).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{ val sql = fillTemplate(x._1,x._2,x._3) tableEnv.executeSql(sql) x._1 }) HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv) val stmtSet = tableEnv.createStatementSet() val allInsertSqls = filledAllSqlsTable.map(table=>{ s"insert into tsgz.${table} select * from default_catalog.default_database.${table}" }).toList allInsertSqls.foreach(x=>{ stmtSet.addInsertSql(x) }) val insertTaskStatus = stmtSet.execute() //insertTaskStatus.print() println(insertTaskStatus.getJobClient.get().getJobStatus()) } /** * 填充kafka sql映射表的模板内容 * */ def fillTemplate(tableName:String, topicName:String, fields:String)={ val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS val filled = s"create table ${tableName} (${fields}) with ('connector' = 'kafka','topic' = '${topicName}','properties.bootstrap.servers' = '${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' = 'json','scan.startup.mode' = 'latest-offset')" filled } 执行后报错 Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) at com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54) at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21) at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala) 报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。 参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释: If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED). [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention 如回答有误,请指正。 发件人: yinghua...@163.com 发送时间: 2021-01-14 18:02 收件人: user-zh 主题: 回复: 回复: 请教个Flink checkpoint的问题 代码如下: streamEnv.enableCheckpointing(5 * 60 * 1000); CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig(); checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkPointConfig.setCheckpointTimeout(1 * 60 * 1000); checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000)); checkPointConfig.setMaxConcurrentCheckpoints(1); checkPointConfig.setTolerableCheckpointFailureNumber(3); checkPointConfig .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); String checkpointPath = this.execOptions.get(CHECKPOINT_PATH); try { StateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointPath); streamEnv.setStateBackend(rocksDBStateBackend); yinghua...@163.com 发件人: Evan 发送时间: 2021-01-14 17:55 收件人: user-zh 主题: 回复: 请教个Flink checkpoint的问题 代码图挂掉了,看不到代码 发件人: yinghua...@163.com 发送时间: 2021-01-14 17:26 收件人: user-zh 主题: 请教个Flink checkpoint的问题 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息? yinghua...@163.com
回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码 发件人: yinghua...@163.com 发送时间: 2021-01-14 17:26 收件人: user-zh 主题: 请教个Flink checkpoint的问题 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息? yinghua...@163.com
Re: Re: Flink webui 查询任务信息报错500
你可以从akka的原理架构中获取一些信息 参考: https://akka.io 之前在其他博主的介绍中看到过,感觉意思差不多,上面说这是JobManager和TaskManager之间通信时发送的消息大小的最大值 发件人: 赵一旦 发送时间: 2021-01-14 14:19 收件人: user-zh 主题: Re: Flink webui 查询任务信息报错500 好的,我找到了这个参数。不过这个参数表达啥含义知道吗,我看10MB不是个小数字感觉。 Evan 于2021年1月14日周四 下午1:54写道: > 有这样一个参数“akka.framesize” ,可以在你启动flink的时候加上 或者 在conf/flink-conf.yaml 配置上: > > akka.framesize > "10485760b"StringMaximum size of messages which are sent between the > JobManager and the TaskManagers. If Flink fails because messages exceed > this limit, then you should increase it. The message size requires a > size-unit specifier. > > 参考: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html > > > > > > 发件人: 赵一旦 > 发送时间: 2021-01-14 11:38 > 收件人: user-zh > 主题: Flink webui 查询任务信息报错500 > 报错500,开发者工具展示的异常信息如下。 > side:↵org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: > The method requestJob's result size 19811407 exceeds the maximum size > 10485760 .↵ at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:363)↵ > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:337)↵ > at > > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)↵ > at > > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)↵ > at > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)↵ > at > > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)↵ > at > > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)↵ > at > > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)↵ > at > > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)↵ > at > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)↵ > at > > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)↵ > at > > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)↵ > at akka.dispatch.OnComplete.internal(Future.scala:264)↵ at > akka.dispatch.OnComplete.internal(Future.scala:261)↵ at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)↵ at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)↵ at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)↵ at > > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)↵ > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)↵ > at > > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)↵ > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)↵ at > > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)↵ > at > > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)↵ > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)↵ at > scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)↵ at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)↵ at > > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)↵ > at > > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)↵ > at > > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)↵ > at > > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)↵ > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)↵ > at > > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)↵ > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)↵ at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)↵ > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)↵ at > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)↵ > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)↵ > at > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)↵↵End > of exception on server side>" > > 想知道这个是什么情况,以及需要调整什么参数呢? >
Re: Flink webui 查询任务信息报错500
有这样一个参数“akka.framesize” ,可以在你启动flink的时候加上 或者 在conf/flink-conf.yaml 配置上: akka.framesize "10485760b"StringMaximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier. 参考: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html 发件人: 赵一旦 发送时间: 2021-01-14 11:38 收件人: user-zh 主题: Flink webui 查询任务信息报错500 报错500,开发者工具展示的异常信息如下。 " 想知道这个是什么情况,以及需要调整什么参数呢?
Re: Flink webui 查询任务信息报错500
这是flink的Akka部分报的错,相关源码如下,可以找找这个 maximumFramesize 怎么修改? https://github.com/apache/flink/blob/d093611b5dfab95fe62e4f861879762ca2e43437/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java: private Either, AkkaRpcException> serializeRemoteResultAndVerifySize( Object result, String methodName) { try { SerializedValue serializedResult = new SerializedValue<>(result); long resultSize = serializedResult.getByteArray() == null ? 0 : serializedResult.getByteArray().length; if (resultSize > maximumFramesize) { return Either.Right( new AkkaRpcException( "The method " + methodName + "'s result size " + resultSize + " exceeds the maximum size " + maximumFramesize + " .")); } else { return Either.Left(serializedResult); } } catch (IOException e) { return Either.Right( new AkkaRpcException( "Failed to serialize the result for RPC call : " + methodName + '.', e)); } } 发件人: 赵一旦 发送时间: 2021-01-14 11:38 收件人: user-zh 主题: Flink webui 查询任务信息报错500 报错500,开发者工具展示的异常信息如下。 " 想知道这个是什么情况,以及需要调整什么参数呢?
Re: FlinkSQL Filter Error With Float Column on flink-1.12.0
你好,在数据库中,Float类型存的是个近似值,不能用类似于 = 或者 != 的比较语句,所以也不支持IN操作 希望能帮助到你 From: jy l Date: 2021-01-12 18:04 To: user-zh Subject: FlinkSQL Filter Error With Float Column on flink-1.12.0 Hi: Flink SQL filter data throw an exception, code: def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.fromElements( (1.0f, 11.0f, 12.0f), (2.0f, 21.0f, 22.0f), (3.0f, 31.0f, 32.0f), (4.0f, 41.0f, 42.0f), (5.0f, 51.0f, 52.0f) ) val settings = EnvironmentSettings.newInstance() .inStreamingMode() .useBlinkPlanner() .build() val tEnv = StreamTableEnvironment.create(env, settings) tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2")) val query = """ |select * from myTable where id in (1.0, 2.0, 3.0) |""".stripMargin tEnv.executeSql(query).print() } exception: Exception in thread "main" java.lang.UnsupportedOperationException: class org.apache.calcite.sql.type.SqlTypeName: FLOAT at org.apache.calcite.util.Util.needToImplement(Util.java:1075) at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703) at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408) at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276) at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223) at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737) at org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710) at org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397) at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237) at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110) at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157) at org.apache.calcite.util.Sarg.printTo(Sarg.java:106) at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709) at org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652) at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502) Why is that? How do i need to solve it? thanks.
回复: flink sql消费kafka sink到mysql问题
flinksql 貌似是目前做不到你说的这样 发件人: air23 发送时间: 2021-01-06 12:29 收件人: user-zh 主题: flink sql消费kafka sink到mysql问题 你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 然后再重启 发现报错的数据 会丢失 采用的scan.startup.mode' = 'group-offsets' 按理说 不是要重新消费 失败的那条数据 开始消费吗? 请问如何配置 可以不丢失数据 CREATE TABLE source1 ( id BIGINT , username STRING , password STRING , AddTime TIMESTAMP , origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'plink_canal', 'properties.bootstrap.servers' = '***', 'properties.group.id' = 'canal1', 'scan.startup.mode' = 'group-offsets', 'canal-json.table.include' = 'test.*', -- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 'format' = 'canal-json' );
????: flinksql1.11 ????phoenix????????Caused by: org.apache.calcite.avatica.NoSuchStatementException
??bug ?? 2021-01-05 20:20 user-zh ?? flinksql1.11 phoenixCaused by: org.apache.calcite.avatica.NoSuchStatementException ?? flinkv1.11phoneix 1.14.1 CREATE TABLE pe_login_kafka ( id INT, region_id INT, ts TIMESTAMP(3), proc_time AS PROCTIME() ) WITH ( ??connector?? = ??kafka??, ??topic?? = ??t-region, ??properties.bootstrap.servers?? = ????, ??properties.group.id?? = gid??); CREATE TABLE region_dim( id INT, region_name STRING ) WITH ( ??connector?? = ??jdbc??, ??url?? = ??jdbc:phoenix:thin:url=http://172.168.1.15:8765;serialization=PROTOBUF??, ??table-name?? = ph_region??, ??lookup.cache.max-rows?? = ??5000??, ??lookup.cache.ttl?? = ??600s??, ??lookup.max-retries?? = ??3??); --sink INSERT INTO FROM pe_login_kafka k LEFT JOIN region_dim FOR SYSTEM_TIME AS OF k.proc_time AS u ON k.region_id = u.id; ??, Caused by: java.sql.SQLException at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152) ~[flink-connector-jdbc_2.11-1.11.1.jar:1.11.1] ... 18 more Caused by: org.apache.calcite.avatica.NoSuchStatementException at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] ... 20 more 2021-01-05 19:40:07,469 ERROR org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 2 java.sql.SQLException: null at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152) [flink-connector-jdbc_2.11-1.11.1.jar:1.11.1] ??jdbc??org.apache.calcite.avatica.AvaticaConnection??connection??Statement ??
回复: 邮件退订
你好,退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考 [1] https://flink.apache.org/community.html#mailing-lists 发件人: 谢治平 发送时间: 2020-12-16 09:08 收件人: user-zh 主题: 邮件退订 您好,邮件退订一下
Re: flink无法写入数据到ES中
你的SQL语句语法有误,请参考: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html 希望能帮助到你! 发件人: 小墨鱼 发送时间: 2020-12-11 14:46 收件人: user-zh 主题: flink无法写入数据到ES中 我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql = "CREATE TABLE es_sink (\n" +"uid INT,\n" + "appid INT,\n" +"prepage_id INT,\n" + "page_id INT,\n" +"action_id STRING,\n" + "page_name STRING,\n" +"action_name STRING,\n" + "prepage_name STRING,\n" +"stat_time BIGINT,\n" + "dt DATE,\n" +"PRIMARY KEY (uid) NOT ENFORCED\n" + ") WITH (\n" +"'connector.type' = 'elasticsearch',\n" + "'connector.version' = '6',\n" +"'connector.hosts' = 'http://localhost:9200',\n" +"'connector.index' = 'mytest',\n" +"'connector.document-type' = 'user_action',\n" +"'update-mode' = 'append',\n" + "'connector.key-null-literal' = 'n/a',\n" + "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json'\n" +")";并通过下面查询出数据String sql = "select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit 1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 关于cluster.evenly-spread-out-slots参数的底层原理
发件人: Shawn Huang 发送时间: 2020-11-06 16:56 收件人: user-zh 主题: Re: 关于cluster.evenly-spread-out-slots参数的底层原理 我说一下我看源码(1.11.2)之后的理解吧,不一定准确,仅供参考。 cluster.evenly-spread-out-slots 这个参数设置后会作用在两个地方: 1. JobMaster 的 Scheduler 组件 2. ResourceManager 的 SlotManager 组件 对于 JobMaster 中的 Scheduler, 它在给 execution vertex 分配 slot 是按拓扑排序的顺序依次进行的。 Scheduler 策略是会倾向于把 execution vertex 分配到它的上游节点所分配到的slot上, 因此在给某个具体 execution vertex 分配 slot 时都会计算出一个当前节点倾向于选择的TaskManager集合, 然后在可选的 slot 候选集中会根据三个维度来为某个slot打分,分别是: 1. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 ResourceID 是相同的(对于standalone模式可以不考虑该维度) 2. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 全限定域名 是相同的 3. 候选slot所在的 TaskManager 目前的资源占用率 只有配置了 cluster.evenly-spread-out-slots 后,才会考虑第三个维度,否则仅会用前面两个维度进行打分。 打分之后会选择得分最高的 slot 分配给当前的 exection vertex。 需要注意的是这里的资源利用率只是根据某个 slot 所在的 TaskManager 中剩下多少个能够分配该 execution vertex 的 slot 计算出的, (因为 Flink 要求同一 job vertex 的并行任务不能分配到同一 slot 中),能分配的越多,资源利用率越小,否则利用率越大。 而不是指实际的CPU内存等资源利用率。 对于 ResourceManager 中的 SlotManager 组件(这里说的都是 Standalone 模式下的 ResourceManager), 由于 JobMaster 的 slot 都是要向 resource manager 申请的。如果 JobMaster 需要新的 slot 了,会向 ResourceManager 的 SlotManager 组件申请。 如果没有配置 cluster.evenly-spread-out-slots 的话,SlotManager 从可用 slot 中随机返回一个。 如果配置了 cluster.evenly-spread-out-slots,SlotManager 会返回资源利用率最小的一个 slot。 这里的资源利用率计算方式是:看某个 slot 所在的 TaskManager 中有多少 slot 还没有被分配,空闲的越多,利用率越小,否则越大。 最后,你提问中说的均衡我没有太理解。某个算子的并发子任务是不会被分配到同一个slot中的, 但如果想把这些子任务均匀分配到不同机器上,这个当前的调度算法应该是无法保证的。 Best, Shawn Huang 赵一旦 于2020年11月5日周四 下午10:18写道: > 有没有人对cluster.evenly-spread-out-slots参数了解比较深入的给讲解下。 > > 我主要想知道,设置这个参数为true之后。Flink是以一个什么样的规则去尽可能均衡分配的。 > standalone集群模式下,每个机器性能相同,flink slot数量配置相同情况下。基于*这种分配规则*,有没有一种方法让Flink做到 > *完全均衡*,而*不是尽可能均衡*? > > 此外,我说的“均衡”都特指算子级别的均衡。不要5机器一共5个slot,然后任务有5个算子,每个算子单并发并且通过不同的share > group各独占1个slot这种均衡。我指的是每个算子都均衡到机器(*假设并发设置合理*)。 >
????: ?????? pyflink??where??????????????????????
?? ?? where??where??pyflink??api??where?? ?? 2020-11-02 10:15 user-zh ?? ?? pyflink??where?? ?? ??where ?? --- Py4JJavaError Traceback (most recent call last)
?????? flink1.11????????
Hi,??jar??1??45jar -- -- ??: "user-zh"
FlinkSQL 任务提交后 任务名称问题
代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into esSinkTable select ... from kafkaSourceTable")执行 任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table” 这样很不友好啊,能不能我自己指定任务名称呢?
回复:ddl es 报错
Hello, 这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作 真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。 而tableEnv.toRetractStream(table, Row.class).print(); 这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。 2020年7月9日15:31:56 -- 原始邮件 -- 发件人: "出发"<573693...@qq.com>; 发送时间: 2020年3月23日(星期一) 晚上11:30 收件人: "user-zh"http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append
代码中如何取消正在运行的Flink Streaming作业
这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming API有没有提供类似的接口,调用后就能停止这个Stream作业呢?
回复:fink新增计算逻辑时kafka从头开始追平消费记录
之前的代码好像乱码了,我设置了一下,重新发一下,建议你 在获取consumer之后,再设置一下 consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6 /** * @param env * @param topic * @param time 订阅的时间 * @return * @throws IllegalAccessException */ public static DataStreamSource
回复: 回复:fink新增计算逻辑时kafka从头开始追平消费记录
苟刚你好,刚才看了你的kafka消费代码,建议你在获取consumer后,增加一行如下代码 “consumer.setStartFromLatest();”然后再测试一下。 /** * @param env * @param topic * @param time 订阅的时间 * @return * @throws IllegalAccessException */ public static DataStreamSource
??????fink??????????????kafka????????????????????
??kafkaoffset -- -- ??: ""
?????? Re: flink????kafka????????????kafka??????????????????
??kafka??Offset?? kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zkhost:2181 --group ${group.id} --topic ${topic_name} zkhost ??group.id??topic_name Group Topic Pid Offset logSize Lag Owner test dy_event 0 8115733 10658588 2542855 none test dy_event 1 8114221 10658585 2544364 none test dy_event 2 8115173 10658587 2543414 none test dy_event 3 8115127 10658585 2543458 none test dy_event 4 8115160 10658587 2543427 none pid Offset?? -- -- ??: "Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration wqpapa https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration > > > >wqpapa
??????flink????Kafka????????????
kafka??advertised.host.name??advertised.port host?? Kafka Kafka ?? 0.8.2.x?? 0.10.x Kafka ?? ??,?? hosts ?? zookeeper ?? hostnamehostname1:2181,hostname2:2181,hostname3:2181 ??kafka?? ?? -- -- ??: "sunfulin"https://manual.sensorsdata.cn/sa/latest/page-1573828.html??FlinkKafka??kafka console consumer?? flinkjob??
??????flink????Kafka????????????
kafkaadvertised.host.name -- -- ??: "sunfulin"https://manual.sensorsdata.cn/sa/latest/page-1573828.html??FlinkKafka??kafka console consumer?? flinkjob??
Flink????????????
??flink1.7.1 ??centos 7 job??start-cluster.sh test04 ??flink job?? $ bin/flink run -m test04:8081 -c org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass flink-scala-project1.jar Starting execution of program The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: f9ac0c76e0e44cac6d6c3b1c41afa161) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645) at org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass$.main(FlinkConsumerKafkaSinkToKuduMainClass.scala:16) at org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass.main(FlinkConsumerKafkaSinkToKuduMainClass.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:380) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:216) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:515) at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHan
Flink????????????
??flink1.7.1 ??centos 7 job??start-cluster.sh test04 ??flink job?? $ bin/flink run -m test04:8081 -c org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass flink-scala-project1.jar Starting execution of program The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: f9ac0c76e0e44cac6d6c3b1c41afa161) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645) at org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass$.main(FlinkConsumerKafkaSinkToKuduMainClass.scala:16) at org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass.main(FlinkConsumerKafkaSinkToKuduMainClass.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:380) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:216) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:515) at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHan