|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
        OutputTag<BaseInfo> requestStream = new 
OutputTag<BaseInfo>("requestStream") {
        };
        OutputTag<BaseInfo> answerStream = new 
OutputTag<BaseInfo>("answerStream") {
        };

//1、连接测试环境kafka的数据
        String servers = 
FlinkConfig.config.getProperty("dev_bootstrap.servers");
        String topicName = FlinkConfig.config.getProperty("dev_topicName");
        String groupId = FlinkConfig.config.getProperty("dev_groupId");
        String devMode = FlinkConfig.config.getProperty("dev_mode");
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", servers);
        prop.setProperty("group.id", groupId);
        prop.setProperty("auto.offset.reset", devMode);
        DataStreamSource<String> sourceStream = env.addSource(new 
FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- 
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBAAAAwACABYAAACuAgAAAAAAAAAAAACuAgAATQAAAAFIAAAAAAFSMAAAADEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

        //2、对源数据进行处理,生成baseInfo基类的数据
        SingleOutputStreamOperator<BaseInfo> baseInfoStream = 
sourceStream.map(new MapFunction<String, BaseInfo>() {
@Override
            public BaseInfo map(String value) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
                String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
                String datas = jsonObject.getString("data");

                String[] splits = datas.split("\n");
                HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
                String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
                String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
                        splits[i] = splits[i].replaceFirst("=", "&");
                        String[] temp = splits[i].split("&");
if (temp.length > 1) {
                            dataMap.put(temp[0].toLowerCase(), temp[1]);
                        }
                    }
                }
return new BaseInfo(dataMap.get("action"), serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
            }
        });

//3、使用process方法进行baseInfoStream流切割
        SingleOutputStreamOperator<BaseInfo> tagStream = 
baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));

//4、根据不同的tag进行不同的输出流设定
        DataStream<BaseInfo> requestDataStream = 
tagStream.getSideOutput(requestStream);
        DataStream<BaseInfo> answerDataStream = 
tagStream.getSideOutput(answerStream);

        requestDataStream.print("requestDataStream");
        answerDataStream.print("answerDataStream");

//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
        //5.1 先对请求流进行处理
        SingleOutputStreamOperator<OutInfo> outRequestDataStream = 
requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
            public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
                String actionType = value.getFuncId();
                System.out.println(actionType);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
                try {
                    String sql = "select action_name from ActionType where 
action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionType);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
//                return new OutInfo(value.getFuncId(), value.getServerIp(), 
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), 
actionName,DateUtils.format(new Date()));
                return new OutInfo(value.getFuncId(), value.getServerIp(), 
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName,  
LocalDateTime.now().toString());
            }
        });
        outRequestDataStream.print("outRequestDataStream");


//5.2 对应答流进行处理
        SingleOutputStreamOperator<OutInfo> outAnswerDataStream = 
answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
            public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
                String actionType = value.getFuncId();
                System.out.println(actionType);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
                try {
                    String sql = "select action_name from ActionType where 
action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionType);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
//                return new OutInfo(value.getFuncId(), value.getServerIp(), 
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, 
DateUtils.format(new Date()));
                return new OutInfo(value.getFuncId(), value.getServerIp(), 
value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, 
LocalDateTime.now().toString());
            }
        });
        outAnswerDataStream.print("outAnswerDataStream");

//6、讲两条流转换为对应的表 进行关联取最小值
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//设置状态值为1天
        tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
//        tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
//        tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
        Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, 
Schema.newBuilder()
                .column("funcId", DataTypes.STRING())
                .column("serverIp", DataTypes.STRING())
                .column("outTime", DataTypes.BIGINT())
                .column("handleSerialNo", DataTypes.STRING())
                .column("info", DataTypes.STRING())
                .column("funcIdDesc", DataTypes.STRING())
                .column("eventTime", DataTypes.TIMESTAMP(3))
                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
                .build());
        Table tableAnswer = 
tableEnv.fromDataStream(outAnswerDataStream,Schema.newBuilder()
                .column("funcId", DataTypes.STRING())
                .column("serverIp", DataTypes.STRING())
                .column("outTime", DataTypes.BIGINT())
                .column("handleSerialNo", DataTypes.STRING())
                .column("info", DataTypes.STRING())
                .column("funcIdDesc", DataTypes.STRING())
                .column("eventTime", DataTypes.TIMESTAMP(3))
                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
                .build());


        Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.eventTime  as eventTime\n" +
" from "+ tableRequest +" a\n " +
" inner join "+ tableAnswer +" b" +
" on a.handleSerialNo=b.handleSerialNo ");
        System.out.println("这个是resultTable"+result);
        result.printSchema();
        tableEnv.createTemporaryView("resultTable", result);

        DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, 
MidInfo.class);
        Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("eventTime").rowtime());

        tableEnv.createTemporaryView("midTable",midTable);

//使用TVF的采用渐进式累计窗口进行计算
        Table resulTable = tableEnv.sqlQuery("SELECT 
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable "+
" , DESCRIPTOR(eventTime)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");



        DataStream<Tuple2<Boolean, ResultInfo>> resultStream = 
tableEnv.toRetractStream(resulTable, ResultInfo.class);
        resultStream.print("resultStream");
        SingleOutputStreamOperator<ResultInfo> resultInfoStream = 
resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
@Override
            public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws 
Exception {
return value.f1;
            }
        });
        resultInfoStream.print("resultInfoStream");
        resultInfoStream.addSink(new Sink2Mysql());
        env.execute();
    }
}
|
你好,以上是我的代码,相关报错如下;


| 这个是resultTableUnnamedTable$2
(
  `funcId` STRING,
  `funcIdDesc` STRING,
  `serverIp` STRING,
  `maxTime` BIGINT,
  `minTime` BIGINT,
  `pk` STRING,
  `eventTime` TIMESTAMP(3) *ROWTIME*
)
/* 1 */public class bean$OutInfo$2$Converter implements 
org.apache.flink.table.data.conversion.DataStructureConverter {
/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] 
fieldGetters;
/* 3 */ private final 
org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters;
/* 4 */ public 
bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] 
fieldGetters, org.apache.flink.table.data.conversion.DataStructureConverter[] 
fieldConverters) {
/* 5 */ this.fieldGetters = fieldGetters;
/* 6 */ this.fieldConverters = fieldConverters;
/* 7 */ }
/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = new 
org.apache.flink.table.data.GenericRowData(7);
/* 11 */ genericRow.setField(0, 
fieldConverters[0].toInternalOrNull(((java.lang.String) external.getFuncId())));
/* 12 */ genericRow.setField(1, 
fieldConverters[1].toInternalOrNull(((java.lang.String) 
external.getServerIp())));
/* 13 */ genericRow.setField(2, 
fieldConverters[2].toInternalOrNull(((java.lang.Long) external.getOutTime())));
/* 14 */ genericRow.setField(3, 
fieldConverters[3].toInternalOrNull(((java.lang.String) 
external.getHandleSerialNo())));
/* 15 */ genericRow.setField(4, 
fieldConverters[4].toInternalOrNull(((java.lang.String) external.getInfo())));
/* 16 */ genericRow.setField(5, 
fieldConverters[5].toInternalOrNull(((java.lang.String) 
external.getFuncIdDesc())));
/* 17 */ genericRow.setField(6, 
fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) 
external.getEventTime())));
/* 18 */ return genericRow;
/* 19 */ }
/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
/* 21 */ final org.apache.flink.table.data.RowData internal = 
(org.apache.flink.table.data.RowData) o;
/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
/* 23 */ structured.setFuncId(((java.lang.String) 
fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
/* 24 */ structured.setServerIp(((java.lang.String) 
fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
/* 25 */ structured.setOutTime(((java.lang.Long) 
fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
/* 26 */ structured.setHandleSerialNo(((java.lang.String) 
fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
/* 27 */ structured.setInfo(((java.lang.String) 
fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
/* 28 */ structured.setFuncIdDesc(((java.lang.String) 
fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
/* 29 */ structured.setEventTime(((java.lang.String) 
fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
/* 30 */ return structured;
/* 31 */ }
/* 32 */}

/* 1 */public class bean$OutInfo$2$Converter implements 
org.apache.flink.table.data.conversion.DataStructureConverter {
/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] 
fieldGetters;
/* 3 */ private final 
org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters;
/* 4 */ public 
bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] 
fieldGetters, org.apache.flink.table.data.conversion.DataStructureConverter[] 
fieldConverters) {
/* 5 */ this.fieldGetters = fieldGetters;
/* 6 */ this.fieldConverters = fieldConverters;
/* 7 */ }
/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = new 
org.apache.flink.table.data.GenericRowData(7);
/* 11 */ genericRow.setField(0, 
fieldConverters[0].toInternalOrNull(((java.lang.String) external.getFuncId())));
/* 12 */ genericRow.setField(1, 
fieldConverters[1].toInternalOrNull(((java.lang.String) 
external.getServerIp())));
/* 13 */ genericRow.setField(2, 
fieldConverters[2].toInternalOrNull(((java.lang.Long) external.getOutTime())));
/* 14 */ genericRow.setField(3, 
fieldConverters[3].toInternalOrNull(((java.lang.String) 
external.getHandleSerialNo())));
/* 15 */ genericRow.setField(4, 
fieldConverters[4].toInternalOrNull(((java.lang.String) external.getInfo())));
/* 16 */ genericRow.setField(5, 
fieldConverters[5].toInternalOrNull(((java.lang.String) 
external.getFuncIdDesc())));
/* 17 */ genericRow.setField(6, 
fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) 
external.getEventTime())));
/* 18 */ return genericRow;
/* 19 */ }
/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
/* 21 */ final org.apache.flink.table.data.RowData internal = 
(org.apache.flink.table.data.RowData) o;
/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
/* 23 */ structured.setFuncId(((java.lang.String) 
fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
/* 24 */ structured.setServerIp(((java.lang.String) 
fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
/* 25 */ structured.setOutTime(((java.lang.Long) 
fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
/* 26 */ structured.setHandleSerialNo(((java.lang.String) 
fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
/* 27 */ structured.setInfo(((java.lang.String) 
fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
/* 28 */ structured.setFuncIdDesc(((java.lang.String) 
fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
/* 29 */ structured.setEventTime(((java.lang.String) 
fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
/* 30 */ return structured;
/* 31 */ }
/* 32 */}

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
 at akka.dispatch.OnComplete.internal(Future.scala:300)
 at akka.dispatch.OnComplete.internal(Future.scala:297)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
 at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
 at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
 at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
 at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
 at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
 at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
 at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
 at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
 at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
 at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
 at akka.actor.Actor.aroundReceive(Actor.scala:537)
 at akka.actor.Actor.aroundReceive$(Actor.scala:535)
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
 at akka.actor.ActorCell.invoke(ActorCell.scala:548)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
 at akka.dispatch.Mailbox.run(Mailbox.scala:231)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
 ... 4 more
Caused by: org.apache.flink.table.api.TableException: Error while generating 
structured type converter.
 at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
 at 
org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46)
 at 
org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76)
 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
 at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
 at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
 ... 12 more
Caused by: 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
 at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
 ... 13 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
 at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
 at 
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
 ... 16 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: 
Cannot cast "java.lang.String" to "java.time.LocalDateTime"
 at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
 at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
 at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
 at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
 at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
 at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
 at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
 at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
 at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
 at 
org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
 at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
 at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
 at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
 at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
 at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
 at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
 at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
 at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
 at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
 at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
 at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
 at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
 at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
 at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
 at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
 at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
 at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
 at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
 at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
 at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
 at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
 at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
 at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
 at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
 at 
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
 at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
 at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
 at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
 at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
 at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
 ... 22 more

Process finished with exit code 1

|


| |
小昌同学
|
|
ccc0606fight...@163.com
|
---- 回复的原邮件 ----
| 发件人 | lxk<lxk7...@163.com> |
| 发送日期 | 2023年5月15日 18:21 |
| 收件人 | <user-zh@flink.apache.org> |
| 主题 | Re:报错显示为bug |
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。

















在 2023-05-15 17:11:42,"小昌同学" <ccc0606fight...@163.com> 写道:
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. “
flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复