回复: 回复:报错显示为bug

2023-05-16 文章 小昌同学
好滴呀  谢谢各位老师


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月16日 08:46 |
| 收件人 |  ,
 |
| 主题 | Re: 回复:报错显示为bug |
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换

















At 2023-05-15 18:29:15, "小昌同学"  wrote:
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
OutputTag requestStream = new
OutputTag("requestStream") {
};
OutputTag answerStream = new
OutputTag("answerStream") {
};

//1、连接测试环境kafka的数据
String servers =
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName =
FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator baseInfoStream =
sourceStream.map(new MapFunction() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp,
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});

//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator tagStream =
baseInfoStream.process(new MyProcessFunction(reques

Re: 回复:报错显示为bug

2023-05-15 文章 Shammon FY
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

> 你好,从报错来看是类型不兼容导致的。
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
> 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-05-15 18:29:15, "小昌同学"  wrote:
> >|
> >package job;
> >import bean.BaseInfo;
> >import bean.MidInfo;
> >import bean.OutInfo;
> >import bean.ResultInfo;
> >import com.alibaba.fastjson.JSON;
> >import com.alibaba.fastjson.JSONObject;
> >import config.FlinkConfig;
> >import function.MyProcessFunction;
> >import org.apache.flink.api.common.functions.MapFunction;
> >import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >import org.apache.flink.api.java.tuple.Tuple2;
> >import org.apache.flink.streaming.api.TimeCharacteristic;
> >import org.apache.flink.streaming.api.datastream.DataStream;
> >import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >import org.apache.flink.table.api.DataTypes;
> >import org.apache.flink.table.api.Schema;
> >import org.apache.flink.table.api.Table;
> >import org.apache.flink.table.api.TableSchema;
> >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >import org.apache.flink.table.types.DataType;
> >import org.apache.flink.util.OutputTag;
> >import sink.Sink2Mysql;
> >import utils.DateUtil;
> >import utils.DateUtils;
> >import utils.JdbcUtil;
> >
> >import java.sql.Connection;
> >import java.sql.PreparedStatement;
> >import java.sql.ResultSet;
> >import java.time.*;
> >import java.util.Date;
> >import java.util.HashMap;
> >import java.util.Properties;
> >
> >public class RytLogAnly4 {
> >public static void main(String[] args) throws Exception {
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >//使用侧输出流
> >OutputTag requestStream = new
> OutputTag("requestStream") {
> >};
> >OutputTag answerStream = new
> OutputTag("answerStream") {
> >};
> >
> >//1、连接测试环境kafka的数据
> >String servers =
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
> >String topicName =
> FlinkConfig.config.getProperty("dev_topicName");
> >String groupId = FlinkConfig.config.getProperty("dev_groupId");
> >String devMode = FlinkConfig.config.getProperty("dev_mode");
> >Properties prop = new Properties();
> >prop.setProperty("bootstrap.servers", servers);
> >prop.setProperty("group.id", groupId);
> >prop.setProperty("auto.offset.reset", devMode);
> >DataStreamSource sourceStream = env.addSource(new
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
> >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
> <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
> >
> >//2、对源数据进行处理,生成baseInfo基类的数据
> >SingleOutputStreamOperator baseInfoStream =
> sourceStream.map(new MapFunction() {
> >@Override
> >public BaseInfo map(String value) throws Exception {
> >JSONObject jsonObject = JSON.parseObject(value);
> >//获取到不同的服务器IP
> >String serverIp = jsonObject.getString("ip");
> >//获取到不同的data的数据
> >String datas = jsonObject.getString("data");
> >
> >String[] splits = datas.split("\n");
> >HashMap dataMap = new HashMap<>();
> >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
> >String time = splits[0].substring(7, 19);
> >//将subData填充到自定义类型中,用来判断时请求还是应答
> >String subData = datas.substring(0, 10);
> >for (int i = 0; i < splits.length; i++) {
> >if (splits[i].contains("=")) {
> >splits[i] = splits[i].replaceFirst("=", "&");
> >String[] temp = splits[i].split("&");
> >if (temp.length > 1) {
> >dataMap.put(temp[0].toLowerCase(), temp[1]);
> >}
> >}
> >

Re:回复:报错显示为bug

2023-05-15 文章 lxk
你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: 
Cannot cast "java.lang.String" to "java.time.LocalDateTime"  
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换

















At 2023-05-15 18:29:15, "小昌同学"  wrote:
>|
>package job;
>import bean.BaseInfo;
>import bean.MidInfo;
>import bean.OutInfo;
>import bean.ResultInfo;
>import com.alibaba.fastjson.JSON;
>import com.alibaba.fastjson.JSONObject;
>import config.FlinkConfig;
>import function.MyProcessFunction;
>import org.apache.flink.api.common.functions.MapFunction;
>import org.apache.flink.api.common.serialization.SimpleStringSchema;
>import org.apache.flink.api.java.tuple.Tuple2;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.datastream.DataStream;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>import org.apache.flink.table.api.DataTypes;
>import org.apache.flink.table.api.Schema;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.TableSchema;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.types.DataType;
>import org.apache.flink.util.OutputTag;
>import sink.Sink2Mysql;
>import utils.DateUtil;
>import utils.DateUtils;
>import utils.JdbcUtil;
>
>import java.sql.Connection;
>import java.sql.PreparedStatement;
>import java.sql.ResultSet;
>import java.time.*;
>import java.util.Date;
>import java.util.HashMap;
>import java.util.Properties;
>
>public class RytLogAnly4 {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>//使用侧输出流
>OutputTag requestStream = new 
> OutputTag("requestStream") {
>};
>OutputTag answerStream = new 
> OutputTag("answerStream") {
>};
>
>//1、连接测试环境kafka的数据
>String servers = 
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
>String topicName = FlinkConfig.config.getProperty("dev_topicName");
>String groupId = FlinkConfig.config.getProperty("dev_groupId");
>String devMode = FlinkConfig.config.getProperty("dev_mode");
>Properties prop = new Properties();
>prop.setProperty("bootstrap.servers", servers);
>prop.setProperty("group.id", groupId);
>prop.setProperty("auto.offset.reset", devMode);
>DataStreamSource sourceStream = env.addSource(new 
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
>//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- 
><315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
>
>//2、对源数据进行处理,生成baseInfo基类的数据
>SingleOutputStreamOperator baseInfoStream = 
> sourceStream.map(new MapFunction() {
>@Override
>public BaseInfo map(String value) throws Exception {
>JSONObject jsonObject = JSON.parseObject(value);
>//获取到不同的服务器IP
>String serverIp = jsonObject.getString("ip");
>//获取到不同的data的数据
>String datas = jsonObject.getString("data");
>
>String[] splits = datas.split("\n");
>HashMap dataMap = new HashMap<>();
>//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
>String time = splits[0].substring(7, 19);
>//将subData填充到自定义类型中,用来判断时请求还是应答
>String subData = datas.substring(0, 10);
>for (int i = 0; i < splits.length; i++) {
>if (splits[i].contains("=")) {
>splits[i] = splits[i].replaceFirst("=", "&");
>String[] temp = splits[i].split("&");
>if (temp.length > 1) {
>dataMap.put(temp[0].toLowerCase(), temp[1]);
>}
>}
>}
>return new BaseInfo(dataMap.get("action"), serverIp, 
>DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
>}
>});
>
>//3、使用process方法进行baseInfoStream流切割
>SingleOutputStreamOperator tagStream = 
> baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
>
>//4、根据不同的tag进行不同的输出流设定
>   

回复:报错显示为bug

2023-05-15 文章 小昌同学
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
OutputTag requestStream = new 
OutputTag("requestStream") {
};
OutputTag answerStream = new 
OutputTag("answerStream") {
};

//1、连接测试环境kafka的数据
String servers = 
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- 
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator baseInfoStream = 
sourceStream.map(new MapFunction() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});

//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator tagStream = 
baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));

//4、根据不同的tag进行不同的输出流设定
DataStream requestDataStream = 
tagStream.getSideOutput(requestStream);
DataStream answerDataStream = 
tagStream.getSideOutput(answerStream);

requestDataStream.print("requestDataStream");
answerDataStream.print("answerDataStream");

//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
//5.1 先对请求流进行处理