[ 
https://issues.apache.org/jira/browse/FLINK-24870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442180#comment-17442180
 ] 

wangbaohua edited comment on FLINK-24870 at 11/11/21, 9:18 AM:
---------------------------------------------------------------

https://gitee.com/wang_bh/flink/blob/master/flink-demo/src/main/java/com/asap/demo/rete/ReteDemo4.java


was (Author: JIRAUSER280022):
DataStream<BeanField> kafkaData1 = kafkaData.map(new MapFunction<StandardEvent, 
BeanField>() {
@Override
public BeanField map(StandardEvent value) throws Exception {
//构造对象属性
List<JsonField> list = new ArrayList<>();
for (Object jsonField : ruleParse.getTableDefine().getJsonFieldList()) {
JsonField jsonFieldBean = (JsonField) jsonField;
String field = Utils.getUrlByCode(jsonFieldBean.getDbFieldName().toLowerCase());
Class class1 = null;
if (jsonFieldBean.getFieldType().equals("int")) {
class1 = Class.forName("java.lang.Integer");
} else if (jsonFieldBean.getFieldType().equals("string")) {
class1 = Class.forName("java.lang.String");

} else if (jsonFieldBean.getFieldType().equals("long")) {
class1 = Class.forName("java.lang.Long");

} else if (jsonFieldBean.getFieldType().equals("date")) {
class1 = Class.forName("java.util.Date");
}
jsonFieldBean.setDbFieldName(field);
properties.put(field, class1);
list.add(jsonFieldBean);

}
//创建对象
Object stu = Utils.generateObject(properties);
//给实例对象赋值
for (Object jsonField : ruleParse.getTableDefine().getJsonFieldList()) {
JsonField jsonFieldBean = (JsonField) jsonField;
if (jsonFieldBean.getFieldType().equals("date")) {
Utils.setValue(stu, jsonFieldBean.getDbFieldName(), 
DateUtil.parseDate(value.getField(jsonFieldBean.getTopicFieldName()), 
"yyyy-MM-dd HH:mm:ss:SSS"));
} else if (jsonFieldBean.getFieldType().equals("int")) {
String fieldValueStr = value.getField(jsonFieldBean.getTopicFieldName());
Integer fieldValue = 0;
if (!(fieldValueStr == null || fieldValueStr.isEmpty())) {
fieldValue = Integer.parseInt(fieldValueStr);
}
Utils.setValue(stu, jsonFieldBean.getDbFieldName(), fieldValue);
} else if (jsonFieldBean.getFieldType().equals("long")) {
String fieldValueStr = value.getField(jsonFieldBean.getTopicFieldName());
Long fieldValue = 0L;
if (!(fieldValueStr == null || fieldValueStr.isEmpty())) {
fieldValue = Long.parseLong(fieldValueStr);
}
Utils.setValue(stu, jsonFieldBean.getDbFieldName(), fieldValue);
}
}
return (BeanField)stu;
}
});
kafkaData1.map(new MapFunction<BeanField, String>() {
@Override
public String map(BeanField value) throws Exception {
System.out.println("value:"+value.toString());
return null;
}
});

Table inputTable = blinkStreamTableEnv.fromDataStream(kafkaData1,
Schema.newBuilder()
.column("eventTwoType", "BIGINT")
.column("deviceParentType", "STRING")
.column("type", "STRING")
.column("eventName", "STRING")
.column("directionDesc", "STRING")
.column("srcIp", "STRING")
.column("dstIp", "STRING")
.column("createTime", "TIMESTAMP_LTZ(3)")
.column("snowId", "BIGINT")
.watermark("createTime", "SOURCE_WATERMARK()")
.build());

> Cannot cast "java.util.Date" to "java.time.Instant"
> ---------------------------------------------------
>
>                 Key: FLINK-24870
>                 URL: https://issues.apache.org/jira/browse/FLINK-24870
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.13.1
>            Reporter: wangbaohua
>            Priority: Blocker
>
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
>         at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
>         ... 11 more
> Caused by: 
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
>         ... 12 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>         ... 15 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 120, Column 
> 101: Cannot cast "java.util.Date" to "java.time.Instant"
>         at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>         at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
>         at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
>         at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
>         at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>         at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
>         at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
>         at 
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
>         at 
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
>         at 
> org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
>         at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
>         at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>         at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
>         at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
>         at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to