为何雪花算法的udf 每次执行的结果都一样,如何能像UUID一样每次变化

2022-12-22 文章 799590...@qq.com.INVALID
环境信息
flink-1.13.6_scala_2.11
java 1.8
hive 1.1.0-cdh5.16.2
hbase 1.2.0-cdh5.16.2
使用的是yarn-session模式

UDF 类在附件中

应用场景是将hive表和kafka表join后输出到hbase,因为在每3秒倒序查询一次hbase的10条结果,故想将雪花算法生成的ID作为hbase的rowkey好做排序。

但是发现udf注册到flink之后,多次执行得到的结果是一样的,没有达到自动增长的目的,在sql-clinet下执行下面的SQL

CREATE FUNCTION IF NOT EXISTS SnowflakeId AS 'com.chinaoly.SnowflakeId' 
LANGUAGE JAVA;

SELECT SnowflakeId() AS a
UNION ALL
SELECT SnowflakeId() AS b
UNION ALL
SELECT SnowflakeId() AS c
UNION ALL
SELECT SnowflakeId() AS d;

得到的结果为
 Table program finished.
 Page: Last of 1   
Updated: 14:44:19.665 
  a
1606178806577209344
1606178806577209344
1606178806577209344
1606178806577209344
1606178806577209344
1606178806577209344
1606178806577209344
1606178806577209344
1606178806577209344

或者

SELECT SnowflakeId() AS a, SnowflakeId() AS b, SnowflakeId() AS c, 
SnowflakeId() AS d;

得到的结果为

  a  b  
c  d
16061816225490288641606181622549028864
16061816225490288641606181622549028864

而系统自带的UUID确能每次生成不一样的结果,如

SELECT UUID() AS a, UUID() AS b, UUID() AS c, UUID() AS d, UUID() AS e, UUID() 
AS f, UUID() AS g, UUID() AS h;

得到的结果为:

 a (CHAR(36) NOT NULL):
 e16cd19c-b636-4a26-bca3-94424fee8313

 b (CHAR(36) NOT NULL):
 bb242ce0-6d73-428a-bc7e-6d35d44e7f1c

 c (CHAR(36) NOT NULL):
 357e56b6-a4ca-4666-9d09-0addf84db421

 d (CHAR(36) NOT NULL):
 deb70796-dc96-464c-a477-d19416fd0c0d

 e (CHAR(36) NOT NULL):
 233bf58e-9869-42d0-ad67-c21f653143d3

使用 UNION ALL的方式执行,结果也是一样的

SELECT UUID() AS a
UNION ALL
SELECT UUID() AS b
UNION ALL
SELECT UUID() AS c;

得到的结果为
  a
 e6955237-64f3-4b72-bf15-03098~
 e6955237-64f3-4b72-bf15-03098~
 e6955237-64f3-4b72-bf15-03098~

因为需求的原因,输出字段是不固定的,每办法用输出字段作为rowkey,请问sink到hbase时如何设计rowkey才能随时间倒序取最新的10条呢?


谌祥,杭州 - java后端开发 - 大数据方向
799590...@qq.com


使用自定义函数报入参填写字段名报Cannot load user class: com.example.udf.SubStr,填写字符串正常

2022-04-20 文章 799590...@qq.com.INVALID

环境信息

flink-1.13.6_scala_2.11
java 1.8
使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

自定义函数的代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {
  public String eval(String s, Integer start, Integer end) {
return s.substring(start.intValue(), end.intValue());
  }
}
提交到集群的sql代码
[
"DROP TABLE IF EXISTS source_datagen",
"CREATE TABLE source_datagen(f_sequence INT,f_random INT,f_random_str STRING,ts 
AS localtimestamp,WATERMARK FOR ts AS ts) WITH ('connector' = 
'datagen','rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10')",
"DROP TABLE IF EXISTS print_sink",
"CREATE TABLE print_sink(id STRING,user_id STRING,`status` STRING,`str` STRING) 
WITH ('connector' = 'print')","INSERT INTO print_sink SELECT CAST(f_sequence AS 
STRING) AS id, CAST(f_random AS STRING) AS user_id, CAST(ts AS STRING) AS 
status,mysubstr(f_random_str,1,4) AS str FROM source_datagen"
]

controller的业务逻辑为

public String executeDefaultSql(String sql) throws Exception {
log.info(sql);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(host,port);
env.setStateBackend(new HashMapStateBackend());
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
List jars = new ArrayList<>();
jars.add("hdfs:///xxx/file/function/flink/d78721345f45422da269fa0411127eda0453523812.jar");
try {
for (String jar : jars) {
log.info(jar);
EnvUtil.loadJar(URLUtil.url(jar));
}

tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars);
tableEnv.executeSql("CREATE FUNCTION mysubstr AS 'com.example.udf.SubStr' 
LANGUAGE JAVA").print();
log.info("完成加载hdfs上的udf");
}catch (Exception e){
e.printStackTrace();
}
List list = JSON.parseArray(sql, String.class);
TableResult result = null;
for (String s : list) {
result = tableEnv.executeSql(s);
}
String jobId = "";
log.info(result.getResultKind().name());
if (result.getJobClient().isPresent()) {

log.info(JSON.toJSONString(result.getJobClient().get().getJobStatus()));
jobId = result.getJobClient().get().getJobID().toString();
log.info("jobId:"+jobId);
}else{
result.print();
}
return jobId;
}

报错信息为

2022-04-21 10:12:37
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.example.udf.SubStr
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.SubStr
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass

Re: Re: Python callback server start failed

2022-04-18 文章 799590...@qq.com.INVALID

是下面这个类吗?  没有import
org.apache.flink.table.functions.ScalarFunction

用flinksql创建Function的时候没有要求import  ScalarFunction

下面是上传py文件的逻辑代码

String originalFilename = file.getOriginalFilename();
destFile = new File(System.getProperty("user.dir") + uploadTempPath, 
originalFilename);
FileUtils.copyInputStreamToFile(file.getInputStream(), destFile);
String localPath = destFile.getPath();
copyToHdfsFromLocalFile(localPath,dst);
hadoopPath = dst+originalFilename;
String serviceName = configuration.get("fs.defaultFS");
StreamTableEnvironment tableEnv = getTableEnv();
String pyStr = 
tableEnv.getConfig().getConfiguration().getString("python.files", "");
log.info(pyStr);
String[] split = pyStr.split(",");
List list = new ArrayList<>(Arrays.asList(split));
list.add(serviceName+hadoopPath);
tableEnv.getConfig().getConfiguration().setString("python.files",StrUtil.join(",",list));
tableEnv.executeSql("DROP FUNCTION IF EXISTS 
"+catalogName+"."+defaultDatabase+"."+functionName).print();
String createSql = "CREATE FUNCTION IF NOT EXISTS 
"+catalogName+"."+defaultDatabase+"."+functionName+" AS '" +className+ "' 
LANGUAGE PYTHON";
tableEnv.executeSql(createSql).print();




799590...@qq.com
 
From: Dian Fu
Date: 2022-04-19 14:20
To: user-zh; 799590989
Subject: Re: Re: Python callback server start failed
NameError: name 'ScalarFunction' is not defined

你 import ScalarFunction了吗?

On Tue, Apr 19, 2022 at 2:08 PM 799590...@qq.com.INVALID 
<799590...@qq.com.invalid> wrote:

以下是刚刚的报错日志,现在里面没有Python callback server start 
failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了

2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:257 
|org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as 
/home/tetris/conf
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:219 
|org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog 'myhive'
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:299 
|org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive metastore
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |CatalogManager.java:262 
|org.apache.flink.table.catalog.CatalogManager |Set the current default catalog 
as [myhive] and the current default database as [tetris].
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:599 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:31 
|com.chinaoly.tetris.flink.util.EnvUtil |registerFactory : 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:55 
|com.chinaoly.tetris.flink.util.EnvUtil 
|hdfs://chinaoly/tetris/file/function/flink/myReplace.py
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:621 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf
13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) - Empty 
watch file list. Disabling 
2022-04-19 13:58:21 |INFO  |http-nio-9532-exec-3 |PythonEnvUtils.java:284 
|org.apache.flink.client.python.PythonEnvUtils |Starting Python process with 
environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP, 
HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console, 
ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64, 
PSModulePath=C:\Program 
Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules,
 SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2, 
USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86), 
FPS_BROWSER_USER_PROFILE_STRING=Default, 
PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265,
 PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC, 
DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program 
Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files, 
HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model 94 
Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files, 
PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1, 
LOCALAPPDATA=C:\Users\Administrator\AppData\Local, 
ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program 
Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP, 
FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer, 
LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program Files\Java\jdk1.8.0_261, 
OneDrive=C:\Users\Administrator\OneDrive, 
APPDATA=C:\Users\Administrator\AppData\Roaming, 
ChocolateyInstall=C:\ProgramData\chocolatey, SCALA_HOME=C:\Program Files 
(x86)\scala, CommonProgramFiles=C:\Program Files\Common Files, Pa

Re: Re: Python callback server start failed

2022-04-18 文章 799590...@qq.com.INVALID
t 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
at 
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at 
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:137)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at 
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:798)
at 
org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at 
org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806)
at 
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498)
at 
org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at 
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)



799590...@qq.com
 
From: Dian Fu
Date: 2022-04-19 13:49
To: user-zh; 799590989
Subject: Re: Python callback server start failed
发一下完整的日志文件?
 
On Tue, Apr 19, 2022 at 10:53 AM 799590...@qq.com.INVALID
<799590...@qq.com.invalid> wrote:
 
> 软件版本
>
> flink-1.13.6
> python3.6.8
> 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install
> apache-flink==1.13.6
> standalonesession方式部署的,一个JM  两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6
>
> 问题:
>
> 1、调用python udf时会报如下错误
> Servlet.service() for servlet [dispatcherServlet] in context with path []
> threw exception [Request processing failed; nested exception is
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] with
> root cause
> java.lang.RuntimeException: Python callback server start failed!
>
> 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗?
>
> python udf文件myReplace.py的内容
>
> from pyflink.table.expressions import call
>
> class MyReplace(ScalarFunction):
>   def __init__(self):
> self.factor = 12
>
>   def eval(self, s):
> return s.replace('buy','sale')
>
> 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment(host,port);
> env.setStateBackend(new HashMapStateBackend());
> env.enableCheckpointing(1000);
> env.getCheckpointConfig().setCheckpointStorage(new
> JobManagerCheckpointStorage());
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
> env.getCheckpointConfig().setExternalizedCheckpointCleanup(
> CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
> );
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env,bsSettings);
>
> tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true);
>
> tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true);
> if (!catalogName.equals(tableEnv.getCurrentCatalog())) {
> HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
> defaultDatabase, hiveConfDir);
> tableEnv.registerCatalog(catalogName, hiveCatalog);
> }
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> List jars = new ArrayList<>();
> List pys = new ArrayList<>();
> log.info("开始加载hdfs上的udf");
> String prefix = "/file/function/flink/";
> try {
> EnvUtil.registerFactory(new FsUrlStreamHandlerFactory());
> for (String hdf : EnvUtil.listHdfs(prefix,configuration)) {
> if (hdf.endsWith(".jar")) {
> jars.add(hdf);
> EnvUtil.loadJar(URLUtil.url(hdf));
> } else if (hdf.endsWith(".py")){
> pys.add(hdf);
> }
> }
>
> tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars);
>
> tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,","));
> log.info("完成加载hdfs上的udf");
> }catch (Exception e){
> e.printStackTrace();
> }
>
> python文件存放在hdfs指定的路劲下面
>
> 上传py文件后通过tableEnv.executeSql 执行了
>
> CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS
> 'myReplace.MyReplace' LANGUAGE PYTHON
>
> 先行感谢flink官方同学的辛苦付出
>
>
>
> 799590...@qq.com
>


Python callback server start failed

2022-04-18 文章 799590...@qq.com.INVALID
软件版本

flink-1.13.6
python3.6.8
本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install 
apache-flink==1.13.6
standalonesession方式部署的,一个JM  两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6

问题:

1、调用python udf时会报如下错误
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw 
exception [Request processing failed; nested exception is 
org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot 
instantiate user-defined function 'myhive.tetris.myreplace'.] with root cause
java.lang.RuntimeException: Python callback server start failed!

2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗?

python udf文件myReplace.py的内容

from pyflink.table.expressions import call

class MyReplace(ScalarFunction):
  def __init__(self):
self.factor = 12

  def eval(self, s):
return s.replace('buy','sale')

获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(host,port);
env.setStateBackend(new HashMapStateBackend());
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true);
tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true);
if (!catalogName.equals(tableEnv.getCurrentCatalog())) {
HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, 
hiveConfDir);
tableEnv.registerCatalog(catalogName, hiveCatalog);
}
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
List jars = new ArrayList<>();
List pys = new ArrayList<>();
log.info("开始加载hdfs上的udf");
String prefix = "/file/function/flink/";
try {
EnvUtil.registerFactory(new FsUrlStreamHandlerFactory());
for (String hdf : EnvUtil.listHdfs(prefix,configuration)) {
if (hdf.endsWith(".jar")) {
jars.add(hdf);
EnvUtil.loadJar(URLUtil.url(hdf));
} else if (hdf.endsWith(".py")){
pys.add(hdf);
}
}

tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars);

tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,","));
log.info("完成加载hdfs上的udf");
}catch (Exception e){
e.printStackTrace();
}

python文件存放在hdfs指定的路劲下面

上传py文件后通过tableEnv.executeSql 执行了

CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS 'myReplace.MyReplace' 
LANGUAGE PYTHON

先行感谢flink官方同学的辛苦付出



799590...@qq.com


flinksql执行时提示自定义UDF无法加载的

2022-04-11 文章 799590...@qq.com.INVALID
环境信息

flink-1.13.6_scala_2.11
java 1.8

使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

UDF代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {

public String eval(String s, Integer start,Integer end) {
return s.substring(start,end);
}
}

udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf 
 jar路径列表,在执行下面的sql时报错

insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as 
mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id 
as id, user_id as user_id, status as status from (select id,user_id,status from 
data_2455_5068_model) where status < '4')

2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)

类加载器代码:
public static void loadJar(URL jarUrl) {
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", 
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (!accessible) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) 
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
/**
 * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
 *
 * @param fsUrlStreamHandlerFactory
 * @throws Exception
 */
public static void registerFactory(final FsUrlStreamHandlerFactory 
fsUrlStreamHandlerFactory)
throws Exception {
log.info("registerFactory : " + 
fsUrlStreamHandlerFactory.getClass().getName());
final Field factoryField = URL.class.getDeclaredField("factory");
factoryField.setAccessible(true);
final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
lockField.setAccessible(true);
synchronized (lockField.get(null)) {
final URLStreamHandlerFactory originalUrlStreamHandlerFactory = 
(URLStreamHandlerFactory) factoryField.get(null);
factoryField.set(null, null);
URL.setURLStreamHandlerFactory(protocol -> {
if ("hdfs".equals(protocol)) {
return 
fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
} else {
return 
originalUrlStreamHandlerFactory.createURLStreamHandler(protocol);
}
});
}
}



799590...@qq.com


flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?

2022-04-05 文章 799590...@qq.com.INVALID
问题:flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?

flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2

纯SQL的方式,使用kafka作为source,中间的转换会有DISTINCT 或者 GROUP 操作,将计算结果sink到hive表,会报下面的错误

doesn't support consuming update changes which is produced by node 
GroupAggregate(groupBy=[id, user_id, status, EXPR$3]

在网上找了答案,说需要将sink表设置为upsert模式,尝试过按照下列方式创建sink表,创建表能成功,但提交INSERT INTO时还是报错

source表
 
CREATE TABLE data_2432_5074_model(
id STRING,
user_id STRING,status STRING
) WITH (
'connector' = 'kafka',
'topic' = 'person',
'properties.bootstrap.servers' = '192.168.9.116:9092',
'properties.group.id' = 'chinaoly-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field'='false',
'json.ignore-parse-errors'='true'
)
sink表

CREATE TABLE output_2432_5076_model_1649226175146(
id STRING,
user_id STRING,
status STRING,
my_dt timestamp
) TBLPROPERTIES (
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'all',
'streaming-source.partition-order' = 'create-time',
'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai',
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'write.upsert.enable' = 'true',
'streaming-source.monitor-interval' = '1 min'
)

计算逻辑
INSERT INTO output_2432_5076_model_1649226175146 SELECT DISTINCT id AS id, 
user_id AS user_id, status AS status ,proctime() FROM (SELECT * FROM 
data_2432_5074_model) WHERE status = '1'

万能的官方,能否给我答案,先谢谢了。



799590...@qq.com


Re: Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题

2022-03-14 文章 799590...@qq.com.INVALID
测试了,确实是没有开启checkpoint的原因,刚刚在集群开启了checkpoint存储,代码里面开启了checkpoint的配置,hive中能新增kafka表的数据了。

感谢感谢!



谌祥,杭州 - java后端开发 - 大数据方向
799590...@qq.com
 
发件人: Caizhi Weng
发送时间: 2022-03-14 10:15
收件人: flink中文邮件组
主题: Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题
Hi!
 
流作业写入 hive 的数据需要在 checkpoint 之后才可见。我看你的代码里没有启用 checkpoint,可以试一下启用
checkpoint。
 
jdbc sink 是一个写入即可见的 sink,但它只能保证最终一致性。也就是说如果中途作业出现 failover,那么从上一次
checkpoint 到 failover 之间写入 jdbc sink 的数据就是“冗余”的,要被作业重启后的新数据覆盖才能回归到一致性。
 
filesystem sink 写入的时候应该创建的是一个临时文件,filesystem source 是不会读这个文件的,只有 checkpoint
之后才会把临时文件重命名。
 
799590...@qq.com.INVALID <799590...@qq.com.invalid> 于2022年3月12日周六 14:51写道:
 
>
> 软件版本
> flink:1.13.6
> hive:1.1.1
> hadoop:2.6.0-cdh5.16.2
>
> 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,
>
> 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。
>
>
> 测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。
>
> String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini";
> String keytab = "/home/tetris/conf/company.keytab";
> String principal = "company";
> System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);
>
> Configuration configuration = new Configuration();
> configuration.set("hadoop.security.authentication", "kerberos");
> configuration.set("keytab.file", keytab);
> configuration.setBoolean("hadoop.security.authorization", true);
> configuration.set("kerberos.principal", principal);
> UserGroupInformation.setConfiguration(configuration);
> UserGroupInformation.loginUserFromKeytab(principal, keytab);
>
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081);
> StreamTableEnvironment flinkTableEnv =
> StreamTableEnvironment.create(env,bsSettings);
>
> HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris",
> "/home/tetris/conf", "1.1.1");
> flinkTableEnv.registerCatalog("myhive",hiveCatalog);
> flinkTableEnv.useCatalog("myhive");
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print();
> flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id
> STRING,status STRING) WITH (\n" +
> "'connector' = 'kafka',\n" +
> "'topic' = 'person',\n" +
> "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" +
> "'properties.group.id' = 'testGroup',\n" +
> "'scan.startup.mode' = 'latest-offset',\n" +
> "'format' = 'json',\n" +
>
> "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" +
> ")").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print();
> flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id
> STRING,status STRING)").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> TableResult result = flinkTableEnv.executeSql("INSERT INTO
> output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928");
> System.out.println(result.getJobClient().get().getJobID());
>
>
>
> 谌祥,杭州 - java后端开发 - 大数据方向
> 799590...@qq.com
>


flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题

2022-03-11 文章 799590...@qq.com.INVALID

软件版本
flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2

通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,

运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。

测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。

String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini";
String keytab = "/home/tetris/conf/company.keytab";
String principal = "company";
System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);

Configuration configuration = new Configuration();
configuration.set("hadoop.security.authentication", "kerberos");
configuration.set("keytab.file", keytab);
configuration.setBoolean("hadoop.security.authorization", true);
configuration.set("kerberos.principal", principal);
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(principal, keytab);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081);
StreamTableEnvironment flinkTableEnv = 
StreamTableEnvironment.create(env,bsSettings);

HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris", 
"/home/tetris/conf", "1.1.1");
flinkTableEnv.registerCatalog("myhive",hiveCatalog);
flinkTableEnv.useCatalog("myhive");
flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print();
flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id 
STRING,status STRING) WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'person',\n" +
"'properties.bootstrap.servers' = '121.4.89.228:9092',\n" +
"'properties.group.id' = 'testGroup',\n" +
"'scan.startup.mode' = 'latest-offset',\n" +
"'format' = 'json',\n" +
"'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" +
")").print();
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print();
flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id 
STRING,status STRING)").print();
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
TableResult result = flinkTableEnv.executeSql("INSERT INTO output_2431_4930 
SELECT id, user_id ,`status` FROM data_2431_4928");
System.out.println(result.getJobClient().get().getJobID());



谌祥,杭州 - java后端开发 - 大数据方向
799590...@qq.com