环境信息
flink-1.13.6_scala_2.11
java 1.8
hive 1.1.0-cdh5.16.2
hbase 1.2.0-cdh5.16.2
使用的是yarn-session模式
UDF 类在附件中
应用场景是将hive表和kafka表join后输出到hbase,因为在每3秒倒序查询一次hbase的10条结果,故想将雪花算法生成的ID作为hbase的rowkey好做排序。
但是发现udf注册到flink之后,多次执行得到的结果是一样的,没有达到自动增长的目的,在sql-clinet下执行下面的SQL
CREATE FUNCTION IF NOT EXISTS
环境信息
flink-1.13.6_scala_2.11
java 1.8
使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager
自定义函数的代码
package com.example.udf;
import org.apache.flink.table.functions.ScalarFunction;
public class SubStr extends ScalarFunction {
public String eval(String s, Integer start,
"."+functionName+" AS '" +className+ "'
LANGUAGE PYTHON";
tableEnv.executeSql(createSql).print();
799590...@qq.com
From: Dian Fu
Date: 2022-04-19 14:20
To: user-zh; 799590989
Subject: Re: Re: Python callback server start failed
NameError: name 'ScalarFunction' is n
va.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
799590...@qq.com
From: Dian Fu
Date: 2022-04-19 13:49
To: user-zh; 799590989
Subject:
软件版本
flink-1.13.6
python3.6.8
本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install
apache-flink==1.13.6
standalonesession方式部署的,一个JM 两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6
问题:
1、调用python udf时会报如下错误
Servlet.service() for servlet [dispatcherServlet] in context with path []
环境信息
flink-1.13.6_scala_2.11
java 1.8
使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager
UDF代码
package com.example.udf;
import org.apache.flink.table.functions.ScalarFunction;
public class SubStr extends ScalarFunction {
public String eval(String s, Integer
问题:flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?
flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2
纯SQL的方式,使用kafka作为source,中间的转换会有DISTINCT 或者 GROUP 操作,将计算结果sink到hive表,会报下面的错误
doesn't support consuming update changes which is produced by node
GroupAggregate(groupBy=[id, user_id, status, EXPR$3]
...@qq.com.INVALID <799590...@qq.com.invalid> 于2022年3月12日周六 14:51写道:
>
> 软件版本
> flink:1.13.6
> hive:1.1.1
> hadoop:2.6.0-cdh5.16.2
>
> 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,
>
> 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消
软件版本
flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2
通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,
运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。