flink调用drools服务器问题

2021-08-13 Thread yanyunpeng
Flink 中使用 drools client 执行规则  本地启动没问题  部署到集群的时候报错空指针  


KieCommands kieCommands = KieServices.Factory.get().getCommands();

List> commands = new LinkedList<>();

commands.add(kieCommands.newInsert(event, "event"));

commands.add(kieCommands.newFireAllRules());
KieCommands kieCommands = KieServices.Factory.get().getCommands(); 这行报错空指针


flink版本 13.1
Drools-client

 org.kie.server
 kie-server-client
 7.17.0.Final



请求这种应用了外部包的有啥问题吗?

Re: 批流一体的一些疑问

2021-08-08 Thread yanyunpeng
HI:
大概的逻辑是这样的
insert mysql_result_table【mysql结果表】
select  * from  id_all【id的所有合集  mysql表】 where id not in (select distinct id 
from flink_view【2小时的id的合集】)


insert到mysql的时候 结果数据不会随着窗口变动而变化


在 2021年8月9日 10:19,Caizhi Weng 写道:


Hi! 不太明白这里的“结果插入数据库的时候变成了批,mysql 中的结果不会变化”是什么含义。这是说 sink 
表和维表是同一张表吗?希望能更清晰地描述场景和做法。 yanyunpeng  于2021年8月9日周一 
上午10:12写道: > 发现一个问题大佬能帮忙解答一二? > 1. 数据中的补充表(补充流信息, 流信息的设备配置全集) > 2.数据流 kafka原表 > 
主要想实现的目标 发现一段时间内未发送消息的设备 > 主要实现流程 > 1. 2小时的滑动窗口来distinct所有的设备ID > 2. 
查询mysql的设备合集表 查询 ID not in (distinct id from 滑动窗口) > > 直接查询的时候是没有问题的 能达到批和流一起使用 
> 但是结果插入数据库的时候变成了批 mysql中的结果不会变化 > > 请问这种情况是什么机制 如果是批流一体情况下 
回当做批处理那为啥select的时候能实现目标?

批流一体的一些疑问

2021-08-08 Thread yanyunpeng
发现一个问题大佬能帮忙解答一二?
1. 数据中的补充表(补充流信息, 流信息的设备配置全集)
2.数据流 kafka原表
主要想实现的目标 发现一段时间内未发送消息的设备
主要实现流程
1. 2小时的滑动窗口来distinct所有的设备ID
2. 查询mysql的设备合集表 查询 ID not in (distinct id from 滑动窗口)

直接查询的时候是没有问题的 能达到批和流一起使用 
但是结果插入数据库的时候变成了批 mysql中的结果不会变化 

请问这种情况是什么机制 如果是批流一体情况下 回当做批处理那为啥select的时候能实现目标?

Re: flink table over 窗口报错

2021-08-04 Thread yanyunpeng
好的 谢谢答疑  我这边尝试把时间字段设置成processTime的时候就好使了  抽空是看看源码分析下


在 2021年8月4日 21:01,Shengkai Fang 写道:


我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。 
yanyunpeng  于2021年8月4日周三 下午5:42写道: > Exception in 
thread "main" org.apache.flink.table.api.ValidationException: > Ordering must 
be defined on a time attribute. > at > 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
 > at > 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
 > at java.util.Optional.orElseGet(Optional.java:267) > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
 > at > 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
 > at > 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
 > at > 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
 > at > 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
 > at > 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) > at 
> 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at > 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
 > at > 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
 > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
 > at > 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
 > at > 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
 > at > 
org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
 > at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) > > > > 
> Flink 版本1.13.0 > > > 在 2021年8月4日 17:37,Shengkai Fang 写道: > 
> > 能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三 > 
下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > 
.orderBy($("f_time")) > .preceding("unbounded_range") > > 
.following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > > 
$("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > > 
$("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); 
> 也是一样的 > Exception in thread > "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be > defined on 
a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng< > tsreape...@gmail.com> 
写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark > 的字段是 f_time,这两个不一致。 
yanyunpeng < > yanyunp...@rockontrol.com> > 于2021年8月4日周三 下午2:30写道: > 代码如下: > > 
EnvironmentSettings bbSettings = > > > 
EnvironmentSetting

Re: flink table over 窗口报错

2021-08-04 Thread yanyunpeng
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.
at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
at 
org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)




Flink 版本1.13.0


在 2021年8月4日 17:37,Shengkai Fang 写道:


能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三 
下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) 
> .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > 
.select($("f_value"), > $("f_h"), > $("f_l"), > $("f_j"), > 
$("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > 
$("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be defined on a 
time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng 写道: > > 
> Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng < > 
yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > 
EnvironmentSettings bbSettings = > > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > 
TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > 
f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > 
DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > " 
+ > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > 
f_time \n" + > ") WITH (\n" + > " 'c

Re: flink table over 窗口报错

2021-08-04 Thread yanyunpeng
Table table = tableEnv
 .from("t_yyp_test")
 .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
 .orderBy($("f_time"))
 .preceding("unbounded_range")
 .following(CURRENT_RANGE)
 .as("w"))
 .select($("f_value"),
 $("f_h"),
 $("f_l"),
 $("f_j"),
 $("f_value").avg().over($("w")),
 $("f_value").varPop().over($("w")),
 $("f_value").stddevPop().over($("w")));
也是一样的
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.

在 2021年8月4日 14:34,Caizhi Weng 写道:


Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 
 于2021年8月4日周三 下午2:30写道: > 代码如下: > 
EnvironmentSettings bbSettings = > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > 
TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h 
STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + 
> " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY 
KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") 
WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + 
> " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = 
'123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new 
GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > 
.orderBy($("f_time_bak")) > .preceding("unbounded_range") > 
.following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > 
$("f_j"), > $("f_value").avg().over($("w")), > 
$("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 
> > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be defined on a 
time attribute. > > > 请问这是什么原因

flink table over 窗口报错

2021-08-04 Thread yanyunpeng
代码如下:
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" +
"  f_id INT,\n" +
"  f_h STRING,\n" +
"  f_l STRING,\n" +
"  f_j STRING,\n" +
"  f_value DOUBLE,\n" +
"  f_time TIMESTAMP(3)\n, " +
"  f_time_bak TIMESTAMP(3)\n, " +
"  PRIMARY KEY (f_id) NOT ENFORCED,\n" +
"  WATERMARK FOR f_time AS f_time \n" +
") WITH (\n" +
"   'connector' = 'jdbc',\n" +
"   'url' = 'jdbc:mysql://***',\n" +
"   'table-name' = '123',\n" +
"   'username' = '123',\n" +
"   'password' = '123'\n" +
")");
tableEnv.registerFunction("GaussianFunction", new GaussianFunction());
Table table = tableEnv
.from("t_yyp_test")
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
.orderBy($("f_time_bak"))
.preceding("unbounded_range")
.following(CURRENT_RANGE)
.as("w"))
.select($("f_h"),
$("f_l"),
$("f_j"),
$("f_value").avg().over($("w")),
$("f_value").varPop().over($("w")),
$("f_value").stddevPop().over($("w")));


已经定义了eventTime  使用eventTIme或者别的时间字段排序都报错


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.  


请问这是什么原因

flink 消费kafka 数据去重后聚合问题

2021-07-13 Thread yanyunpeng
create view distinct_view as 
select val,ptime from (select * ,ROW_NUMBER() OVER (PARTITION BY MN ORDER BY 
ptime ) as rowNum from test_udf) where rowNum = 1


select avg(val) as avg_var, STDDEV_POP(val) as spddev_pop_var from 
distinct_view GROUP BY HOP(ptime, INTERVAL '2' SECONDS, INTERVAL '1' DAY);

kafka的数据有重复 但是这么写有问题 请问这种情况应该怎么写

flink-sql 连接kafka报错

2021-07-08 Thread yanyunpeng
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V

flink-sql 查询kafka

kafka版本2.4 connector版本flink-sql-connector-kafka_2.11-1.11.2.jar


请求 这是什么原因 是 connector的版本问题有?