回复: 回复:报错显示为bug
好滴呀 谢谢各位老师 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年5月16日 08:46 | | 收件人 | , | | 主题 | Re: 回复:报错显示为bug | Hi, 从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段 Best, Shammon FY On Mon, May 15, 2023 at 7:29 PM lxk wrote: 你好,从报错来看是类型不兼容导致的。 Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换 At 2023-05-15 18:29:15, "小昌同学" wrote: | package job; import bean.BaseInfo; import bean.MidInfo; import bean.OutInfo; import bean.ResultInfo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import function.MyProcessFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.types.DataType; import org.apache.flink.util.OutputTag; import sink.Sink2Mysql; import utils.DateUtil; import utils.DateUtils; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.time.*; import java.util.Date; import java.util.HashMap; import java.util.Properties; public class RytLogAnly4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //使用侧输出流 OutputTag requestStream = new OutputTag("requestStream") { }; OutputTag answerStream = new OutputTag("answerStream") { }; //1、连接测试环境kafka的数据 String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource sourceStream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); //{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} //2、对源数据进行处理,生成baseInfo基类的数据 SingleOutputStreamOperator baseInfoStream = sourceStream.map(new MapFunction() { @Override public BaseInfo map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //获取到不同的服务器IP String serverIp = jsonObject.getString("ip"); //获取到不同的data的数据 String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap dataMap = new HashMap<>(); //将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 String time = splits[0].substring(7, 19); //将subData填充到自定义类型中,用来判断时请求还是应答 String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); } }); //3、使用process方法进行baseInfoStream流切割 SingleOutputStreamOperator tagStream = baseInfoStream.process(new MyProcessFunction(reques
Re: 回复:报错显示为bug
Hi, 从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段 Best, Shammon FY On Mon, May 15, 2023 at 7:29 PM lxk wrote: > 你好,从报错来看是类型不兼容导致的。 > Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column > 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" > 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换 > > > > > > > > > > > > > > > > > > At 2023-05-15 18:29:15, "小昌同学" wrote: > >| > >package job; > >import bean.BaseInfo; > >import bean.MidInfo; > >import bean.OutInfo; > >import bean.ResultInfo; > >import com.alibaba.fastjson.JSON; > >import com.alibaba.fastjson.JSONObject; > >import config.FlinkConfig; > >import function.MyProcessFunction; > >import org.apache.flink.api.common.functions.MapFunction; > >import org.apache.flink.api.common.serialization.SimpleStringSchema; > >import org.apache.flink.api.java.tuple.Tuple2; > >import org.apache.flink.streaming.api.TimeCharacteristic; > >import org.apache.flink.streaming.api.datastream.DataStream; > >import org.apache.flink.streaming.api.datastream.DataStreamSource; > >import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > >import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >import org.apache.flink.table.api.DataTypes; > >import org.apache.flink.table.api.Schema; > >import org.apache.flink.table.api.Table; > >import org.apache.flink.table.api.TableSchema; > >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >import org.apache.flink.table.types.DataType; > >import org.apache.flink.util.OutputTag; > >import sink.Sink2Mysql; > >import utils.DateUtil; > >import utils.DateUtils; > >import utils.JdbcUtil; > > > >import java.sql.Connection; > >import java.sql.PreparedStatement; > >import java.sql.ResultSet; > >import java.time.*; > >import java.util.Date; > >import java.util.HashMap; > >import java.util.Properties; > > > >public class RytLogAnly4 { > >public static void main(String[] args) throws Exception { > >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > >//使用侧输出流 > >OutputTag requestStream = new > OutputTag("requestStream") { > >}; > >OutputTag answerStream = new > OutputTag("answerStream") { > >}; > > > >//1、连接测试环境kafka的数据 > >String servers = > FlinkConfig.config.getProperty("dev_bootstrap.servers"); > >String topicName = > FlinkConfig.config.getProperty("dev_topicName"); > >String groupId = FlinkConfig.config.getProperty("dev_groupId"); > >String devMode = FlinkConfig.config.getProperty("dev_mode"); > >Properties prop = new Properties(); > >prop.setProperty("bootstrap.servers", servers); > >prop.setProperty("group.id", groupId); > >prop.setProperty("auto.offset.reset", devMode); > >DataStreamSource sourceStream = env.addSource(new > FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); > >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- > <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} > > > >//2、对源数据进行处理,生成baseInfo基类的数据 > >SingleOutputStreamOperator baseInfoStream = > sourceStream.map(new MapFunction() { > >@Override > >public BaseInfo map(String value) throws Exception { > >JSONObject jsonObject = JSON.parseObject(value); > >//获取到不同的服务器IP > >String serverIp = jsonObject.getString("ip"); > >//获取到不同的data的数据 > >String datas = jsonObject.getString("data"); > > > >String[] splits = datas.split("\n"); > >HashMap dataMap = new HashMap<>(); > >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 > >String time = splits[0].substring(7, 19); > >//将subData填充到自定义类型中,用来判断时请求还是应答 > >String subData = datas.substring(0, 10); > >for (int i = 0; i < splits.length; i++) { > >if (splits[i].contains("=")) { > >splits[i] = splits[i].replaceFirst("=", "&"); > >String[] temp = splits[i].split("&"); > >if (temp.length > 1) { > >dataMap.put(temp[0].toLowerCase(), temp[1]); > >} > >} > >
Re:回复:报错显示为bug
你好,从报错来看是类型不兼容导致的。 Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换 At 2023-05-15 18:29:15, "小昌同学" wrote: >| >package job; >import bean.BaseInfo; >import bean.MidInfo; >import bean.OutInfo; >import bean.ResultInfo; >import com.alibaba.fastjson.JSON; >import com.alibaba.fastjson.JSONObject; >import config.FlinkConfig; >import function.MyProcessFunction; >import org.apache.flink.api.common.functions.MapFunction; >import org.apache.flink.api.common.serialization.SimpleStringSchema; >import org.apache.flink.api.java.tuple.Tuple2; >import org.apache.flink.streaming.api.TimeCharacteristic; >import org.apache.flink.streaming.api.datastream.DataStream; >import org.apache.flink.streaming.api.datastream.DataStreamSource; >import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >import org.apache.flink.table.api.DataTypes; >import org.apache.flink.table.api.Schema; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.TableSchema; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >import org.apache.flink.table.types.DataType; >import org.apache.flink.util.OutputTag; >import sink.Sink2Mysql; >import utils.DateUtil; >import utils.DateUtils; >import utils.JdbcUtil; > >import java.sql.Connection; >import java.sql.PreparedStatement; >import java.sql.ResultSet; >import java.time.*; >import java.util.Date; >import java.util.HashMap; >import java.util.Properties; > >public class RytLogAnly4 { >public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >//使用侧输出流 >OutputTag requestStream = new > OutputTag("requestStream") { >}; >OutputTag answerStream = new > OutputTag("answerStream") { >}; > >//1、连接测试环境kafka的数据 >String servers = > FlinkConfig.config.getProperty("dev_bootstrap.servers"); >String topicName = FlinkConfig.config.getProperty("dev_topicName"); >String groupId = FlinkConfig.config.getProperty("dev_groupId"); >String devMode = FlinkConfig.config.getProperty("dev_mode"); >Properties prop = new Properties(); >prop.setProperty("bootstrap.servers", servers); >prop.setProperty("group.id", groupId); >prop.setProperty("auto.offset.reset", devMode); >DataStreamSource sourceStream = env.addSource(new > FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- ><315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} > >//2、对源数据进行处理,生成baseInfo基类的数据 >SingleOutputStreamOperator baseInfoStream = > sourceStream.map(new MapFunction() { >@Override >public BaseInfo map(String value) throws Exception { >JSONObject jsonObject = JSON.parseObject(value); >//获取到不同的服务器IP >String serverIp = jsonObject.getString("ip"); >//获取到不同的data的数据 >String datas = jsonObject.getString("data"); > >String[] splits = datas.split("\n"); >HashMap dataMap = new HashMap<>(); >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 >String time = splits[0].substring(7, 19); >//将subData填充到自定义类型中,用来判断时请求还是应答 >String subData = datas.substring(0, 10); >for (int i = 0; i < splits.length; i++) { >if (splits[i].contains("=")) { >splits[i] = splits[i].replaceFirst("=", "&"); >String[] temp = splits[i].split("&"); >if (temp.length > 1) { >dataMap.put(temp[0].toLowerCase(), temp[1]); >} >} >} >return new BaseInfo(dataMap.get("action"), serverIp, >DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); >} >}); > >//3、使用process方法进行baseInfoStream流切割 >SingleOutputStreamOperator tagStream = > baseInfoStream.process(new MyProcessFunction(requestStream, answerStream)); > >//4、根据不同的tag进行不同的输出流设定 >
回复:报错显示为bug
| package job; import bean.BaseInfo; import bean.MidInfo; import bean.OutInfo; import bean.ResultInfo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import function.MyProcessFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.types.DataType; import org.apache.flink.util.OutputTag; import sink.Sink2Mysql; import utils.DateUtil; import utils.DateUtils; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.time.*; import java.util.Date; import java.util.HashMap; import java.util.Properties; public class RytLogAnly4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //使用侧输出流 OutputTag requestStream = new OutputTag("requestStream") { }; OutputTag answerStream = new OutputTag("answerStream") { }; //1、连接测试环境kafka的数据 String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource sourceStream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); //{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} //2、对源数据进行处理,生成baseInfo基类的数据 SingleOutputStreamOperator baseInfoStream = sourceStream.map(new MapFunction() { @Override public BaseInfo map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //获取到不同的服务器IP String serverIp = jsonObject.getString("ip"); //获取到不同的data的数据 String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap dataMap = new HashMap<>(); //将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 String time = splits[0].substring(7, 19); //将subData填充到自定义类型中,用来判断时请求还是应答 String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); } }); //3、使用process方法进行baseInfoStream流切割 SingleOutputStreamOperator tagStream = baseInfoStream.process(new MyProcessFunction(requestStream, answerStream)); //4、根据不同的tag进行不同的输出流设定 DataStream requestDataStream = tagStream.getSideOutput(requestStream); DataStream answerDataStream = tagStream.getSideOutput(answerStream); requestDataStream.print("requestDataStream"); answerDataStream.print("answerDataStream"); //5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表 //5.1 先对请求流进行处理