求助:通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1,谢谢!
你好: 1. 实时通过读KAFKA,然后将数据写入了hive,建一张hive表,format 是 Parquet,是按天、小时、分钟来分区; 2. 通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1 在flink sql client下: 1)直接select 所有字段,是没有问题,可以正常读出所有数据。 执行: select * from ubtCatalog.ubtHive.event_all_dwd /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include'='all', 'streaming-source.monitor-interval'='5s', 'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01') */ ; 2) 在1)基础上加上统计函数,一直报莫名的错,java.lang.ArrayIndexOutOfBoundsException: -1 执行: select count(xubtappid) from ubtCatalog.ubtHive.event_all_dwd /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include'='all', 'streaming-source.monitor-interval'='5s', 'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01') */ ; 具体报错信息如下: 2021-04-02 10:06:26 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator bc764cd8ddf7a0cff126f51c16239658). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to enumerate files at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) ... 3 more Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:167) at
????: ????????????????????????????????????????????
?? ?? ??1.11flink sql??,??streaming api kafka,eventtime,stream??table,sql,??kafka topic,flink webui watermarks No Watermark,,kafka topic??,watermarks kafka?? samuel@ubtrobot.com ?? 2020-09-03 09:23 user-zh ?? : ?? ??Flink1.11.1?? package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * ?? */ public class ExceptionAlertHour4{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class); public static void main(String[] args) throws Exception{ ParameterTool parameterTool = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Re: Re: 请指教一个关于时间窗的问题,非常感谢!
您好,谢谢回复! 这个窗口在flink1.11.1里是关闭不了,要等下一个时间窗的一条数据来了才会触发。 发件人: taochanglian 发送时间: 2020-09-03 10:35 收件人: user-zh; samuel@ubtrobot.com 主题: Re: 请指教一个关于时间窗的问题,非常感谢! 没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照 getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0 点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0 的时间,比如:2020-09-01 18:00:00.001出发。 再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发 在 2020/9/2 15:20, samuel@ubtrobot.com 写道: 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! //指定eventtime字段及生成watermark DataStream> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";//希望整点结束时触发时间窗关闭 Table table = tenv.sqlQuery(sql); DataStream dataStream = tenv.toAppendStream(table, Result.class); 输出的结果是: (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} 请问一下哪里出了问题?万分感谢!
????: ????????????????????????????????????????????
roperties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour4-001"); Map offsets = new HashedMap(); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 2800L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 2700L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 3300L); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); } else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour-001"); properties.setProperty("auto.offset.reset", "earliest"); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties)); } else { System.exit(-1); } // transform SingleOutputStreamOperator> singleDS = ds.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector> out) { //System.out.println("Kafka2Hdfs-in:" + value); String newStr = value.replaceAll("*\\r|+r|*\\n|+n|*\\t|+t", ""); //System.out.println("Kafka2Hdfs-newStr:" + newStr); try { // JSON JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); // JSONArray bodyDataArray = record.getJSONArray("body_data"); // ??JSON?? for (int i = 0; i < bodyDataArray.size(); i++) { // ??JSONi?? JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); if (bodyDataObj != null) { Tuple4 log = Tuple4.of( record.getString("HW-AppId"), bodyDataObj.getString("HW-bugId"), bodyDataObj.getString("HW-bugType"), Long.valueOf(bodyDataObj.getString("HW-happenedAt")) ); out.collect(log); } } } catch (Exception e) { System.out.println(e.getMessage()); } } }); singleDS.print(); //eventtime??watermark DataStream> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; Table table = tenv.sqlQuery(sql); DataStream dataStream = tenv.toAppendStream(table, Result.class); dataStream.print(); env.execute("etl.exception.monitor.ExceptionAlertHour"); } public static class Result{ private String appid; private String eventid; private long cnt; private Timestamp stime; private Timestamp etime; public String getAppid() { return appid; } public void setAppid(String appid) { this.appid = appid; } public String getEventid() { return eventid; } public void setEventid(String eventid) { this.eventid = eventid; } public long getCnt() { return cnt; } public void setCnt(long cnt) { this.cnt = cnt; } public Timestamp getStime(){ return stime; } public void setStime(Timestamp stime){ this.stime = stime; } public Timestamp getEtime(){ return etime; } public void setEtime(Timestamp etime){ this.etime = etime; } @Override public String toString(){ return "ResultHour{" + "appid=" + appid + ",eventid=" + eventid + ",cnt=" + cnt + ", stime=" + stime + ", etime=" + etim
??????????????????????????????????????
flink SQL,tumble window //eventtime??watermark DataStream> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";// Table table = tenv.sqlQuery(sql); DataStream dataStream = tenv.toAppendStream(table, Result.class); ?? (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 2020-09-01 18:00:00.0?? (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
????????flink??????????????????
flink ,?? ??mysql??json {"times":5} ---5?? {"temperature": 80} ---80 1)kafka 2)flinkkafka?? ?? 1. 2.??flink CEP?? 3.??
请问Flink-1.10.1 release可以在哪里下载?(无正文)
求依赖包
大家好,有哪位大神有现成的包,非常感谢! flink-connector-elasticsearch7_2.11 深圳市优必选科技股份有限公司 | 平台软件部 邱钺 Samuel Qiu 手机/微信: +0086 150 1356 8368 Email: samuel@ubtrobot.com UBTECH Robotics | www.ubtrobot.com 广东省深圳市南山区平山路鸿莱科创楼13栋3楼优必选 From: samuel@ubtrobot.com Date: 2020-04-15 17:37 To: user-zh Subject: flink-sql-connector-elasticsearch7_2.11-1.10.0.jar 在提交job后,发现不成功,这个问题要怎么解决? 版本:Flink1.10.0 elasticsearch:7.6.0 看了源码,确实是没这个类的: Caused by: java.lang.NoClassDefFoundError: org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76) at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.flink.elasticsearch7.shaded.org.elasticsearch.script.mustache.SearchTemplateRequest at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 14 more 谢谢!
flink-sql-connector-elasticsearch7_2.11-1.10.0.jar
在提交job后,发现不成功,这个问题要怎么解决? 版本:Flink1.10.0 elasticsearch:7.6.0 看了源码,确实是没这个类的: Caused by: java.lang.NoClassDefFoundError: org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76) at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.flink.elasticsearch7.shaded.org.elasticsearch.script.mustache.SearchTemplateRequest at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 14 more 谢谢!