hive streaning 问题请教

2020-10-16 文章 McClone
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().addConfiguration(
new Configuration()
.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(30)));
tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS 
'org.example.flink.TestFunca' LANGUAGE JAVA");
tEnv.executeSql("CREATE TABLE datagen (\n" +
" name STRING,\n" +
" pass STRING,\n" +
" type1 INT,\n" +
" t1 STRING,\n" +
" t2 STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.type1.min'='1',\n" +
" 'fields.type1.max'='10'\n" +
")");

tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql("CREATE TABLE hive_table (\n" +
"  user_id STRING,\n" +
"  order_amount STRING\n" +
") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet 
TBLPROPERTIES (\n" +
"  'sink.partition-commit.trigger'='partition-time',\n" +
"  'sink.partition-commit.delay'='1 h',\n" +
"  'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
")");

tEnv.executeSql("insert into hive_table select 
t1,t2,TestFunca(type1),TestFunca(type1) from datagen");

Caused by: org.apache.flink.table.api.ValidationException: Table options do not 
contain an option key 'connector' for discovering a connector.
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
发送自 Windows 10 版邮件应用



Flink SQL 1.11如何获取执行计划 & StreamGraph

2020-10-16 文章 zilong xiao
1.11好像改了接口,用StreamExecutionEnvironment.getExecutionPlan()会报"No operators
defined in streaming topology. Cannot execute." 1.10是可以正常执行的


Re:Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-16 文章 chenxuying
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常
2线程上下文类加载器是什么

不太明白这两点,可以写个代码例子看看吗


在 2020-10-15 19:47:20,"amen...@163.com"  写道:
>追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
>那这种设置env的方式有可能还会造成其他什么问题?
>
>best,
>amenhub
> 
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人: user-zh
>主题: Re: Re: flink1.11加载外部jar包进行UDF注册
>非常感谢您的回复!
> 
>对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
>因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
>jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
> 
>期待您的回复,谢谢~
> 
>best, 
>amenhub
>发件人: cxydeve...@163.com
>发送时间: 2020-10-15 17:46
>收件人: user-zh
>主题: Re: flink1.11加载外部jar包进行UDF注册
>我们用方法是通过反射设置env的配置,增加pipeline.classpaths
>具体代码如下
>public static void main(final String[] args) throws Exception {
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment tableEnvironment =
>StreamTableEnvironment.create(env, settings);
>//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
>String path = "https://...xxx.jar;;
>loadJar(new URL(path));
>Field configuration =
>StreamExecutionEnvironment.class.getDeclaredField("configuration");
>configuration.setAccessible(true);
>Configuration o = (Configuration)configuration.get(env);
>Field confData = Configuration.class.getDeclaredField("confData");
>confData.setAccessible(true);
>Map temp = (Map)confData.get(o);
>List jarList = new ArrayList<>();
>jarList.add(path);
>temp.put("pipeline.classpaths",jarList);
>tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
>'flinksql.function.udf.CxyTestReturnSelf'");
>tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
>" f_sequence INT,\n" +
>" f_random INT,\n" +
>" f_random_str STRING,\n" +
>" ts AS localtimestamp,\n" +
>" WATERMARK FOR ts AS ts\n" +
>") WITH (\n" +
>" 'connector' = 'datagen',\n" +
>" 'rows-per-second'='5',\n" +
>"\n" +
>" 'fields.f_sequence.kind'='sequence',\n" +
>" 'fields.f_sequence.start'='1',\n" +
>" 'fields.f_sequence.end'='1000',\n" +
>"\n" +
>" 'fields.f_random.min'='1',\n" +
>" 'fields.f_random.max'='1000',\n" +
>"\n" +
>" 'fields.f_random_str.length'='10'\n" +
>")");
>tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
>"f_random_str STRING" +
>") WITH (\n" +
>"'connector' = 'print'\n" +
>")");
>tableEnvironment.executeSql(
>"insert into sinktable " +
>"select CxyTestReturnSelf(f_random_str) " +
>"from sourceTable");
>}
>//动态加载Jar
>public static void loadJar(URL jarUrl) {
>//从URLClassLoader类加载器中获取类的addURL方法
>Method method = null;
>try {
>method = URLClassLoader.class.getDeclaredMethod("addURL",
>URL.class);
>} catch (NoSuchMethodException | SecurityException e1) {
>e1.printStackTrace();
>}
>// 获取方法的访问权限
>boolean accessible = method.isAccessible();
>try {
>//修改访问权限为可写
>if (accessible == false) {
>method.setAccessible(true);
>}
>// 获取系统类加载器
>URLClassLoader classLoader = (URLClassLoader)
>ClassLoader.getSystemClassLoader();
>//jar路径加入到系统url路径里
>method.invoke(classLoader, jarUrl);
>} catch (Exception e) {
>e.printStackTrace();
>} finally {
>method.setAccessible(accessible);
>}
>}
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql 窗口函数对分区的这个列进行过滤

2020-10-16 文章 kcz
因为列会有默认值,也有真实的,我想取到真实的那个列,这个功能如何实现一下。想到了窗口函数,发现不能进行过滤,还有一种骚操作是求max min。之后if来操作。

关于内存大小设置以及预测

2020-10-16 文章 Kyle Zhang
Hi all,
  最近也是遇到比较常见的内存溢出的错误OutOfMemoryError: Java heap space,JM:1g
TM:2g,简单粗暴的设置成2g、4g就可以运行了,
INFO  [] - Loading configuration property:
cluster.termination-message-path, /flink/log/termination.log
INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:  3.906gb (4194304000 bytes)
INFO  [] - Total Flink Memory:  3.266gb (3506438138 bytes)
INFO  [] -   Total JVM Heap Memory: 1.508gb (1619001315 bytes)
INFO  [] - Framework:   128.000mb (134217728 bytes)
INFO  [] - Task:1.383gb (1484783587 bytes)
INFO  [] -   Total Off-heap Memory: 1.758gb (1887436823 bytes)
INFO  [] - Managed: 1.306gb (1402575276 bytes)
INFO  [] - Total JVM Direct Memory: 462.400mb (484861547 bytes)
INFO  [] -   Framework: 128.000mb (134217728 bytes)
INFO  [] -   Task:  0 bytes
INFO  [] -   Network:   334.400mb (350643819 bytes)
INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:400.000mb (419430406 bytes)

请问有没有指标能够事前估算JM、TM需要的内存大小?

Best


回复:回复: 回复: flink 自定义udf注册后不能使用

2020-10-16 文章 罗显宴
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年10月16日 15:45,amen...@163.com 写道:
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub

发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以


发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:回复: 回复: flink 自定义udf注册后不能使用

2020-10-16 文章 罗显宴
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年10月16日 15:45,amen...@163.com 写道:
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub

发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以


发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: 回复: flink 自定义udf注册后不能使用

2020-10-16 文章 amen...@163.com
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub
 
发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试
 
CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;
 
我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以
 

发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用
 
是的,是我传参有问题
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: 回复:回复: flink 自定义udf注册后不能使用

2020-10-16 文章 史 正超
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY 
SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以


发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:回复: flink 自定义udf注册后不能使用

2020-10-16 文章 奔跑的小飞袁
是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:回复: flink 自定义udf注册后不能使用

2020-10-16 文章 奔跑的小飞袁
是的,我这个函数只需要一个参数



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:回复: flink 自定义udf注册后不能使用

2020-10-16 文章 Shuai Xia
你好,没看错的话,只有一个参?


--
发件人:奔跑的小飞袁 
发送时间:2020年10月16日(星期五) 14:18
收件人:user-zh 
主 题:Re: 回复: flink 自定义udf注册后不能使用

完整的sql执行文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
targets ARRAY>,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR,
other_para MAP
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR
)with (
'connector'='filesystem',
'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
'format'='parquet',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='10min'
);

-- 业务过程
insert into cloud_behavior_sink
select
 operation,
 operation_channel,
 `time`,
 ip,
 lat,
 lng,
 user_id,
 device_id,
 imei_encrypt(imei) AS imei,
 product_name,
 product_version,
 product_vendor,
 platform,
 platform_version,
 `languaage`,
 locale
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink 自定义udf注册后不能使用

2020-10-16 文章 奔跑的小飞袁
完整的sql执行文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
targets ARRAY>,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR,
other_para MAP
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR
)with (
'connector'='filesystem',
'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
'format'='parquet',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='10min'
);

-- 业务过程
insert into cloud_behavior_sink
select
 operation,
 operation_channel,
 `time`,
 ip,
 lat,
 lng,
 user_id,
 device_id,
 imei_encrypt(imei) AS imei,
 product_name,
 product_version,
 product_vendor,
 platform,
 platform_version,
 `languaage`,
 locale
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink作业运行失败

2020-10-16 文章 gangzi
我的是hadoop-2.10,flink-sql-connector-hive-1.2.2_2.11-1.11.1.jar。

> 2020年10月16日 下午12:01,Jeff Zhang  写道:
> 
> 你是hadoop2 吗?我记得这个情况只有hadoop3才会出现
> 
> 
> gangzi <1139872...@qq.com> 于2020年10月16日周五 上午11:22写道:
> 
>> TM
>> 的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。
>> 
>>> 2020年10月16日 上午10:50,Jeff Zhang  写道:
>>> 
>>> 你看看TM的log,里面有CLASSPATH的
>>> 
>>> gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道:
>>> 
 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop
 classpath`,但是报:java.lang.NoClassDefFoundError:
 org/apache/hadoop/mapred/JobConf
 
 
>> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。
 
> 2020年10月16日 上午9:59,Shubin Ruan  写道:
> 
> export HADOOP_CLASSPATH=
 
 
>>> 
>>> --
>>> Best Regards
>>> 
>>> Jeff Zhang
>> 
>> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Re: 回复: flink 自定义udf注册后不能使用

2020-10-16 文章 奔跑的小飞袁
这是我的udf声明
CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;
以下是udf实现
public class IMEIEncrypt extends ScalarFunction {

public String eval(String column_type,String value) {
EncryptUtils encryptUtils = new EncryptUtils();
return encryptUtils.encrypt(column_type,value);
}
}




--
Sent from: http://apache-flink.147419.n8.nabble.com/