Re: pyflink 如何指定csv分隔符为“||”

2020-12-29 文章 Xingbo Huang
Hi,

csv.field-delimiter限制了只能使用单个字符,具体可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/csv.html#csv-field-delimiter

Best,
Xingbo

消息室  于2020年12月30日周三 下午12:04写道:

>
> format='csv'想要指定分割符为"||",设置为:
>
>
> 报错:
> Caused by: org.apache.flink.table.api.ValidationException: Option
> 'csv.field-delimiter' must be a string with single character, but was: \|\|
>
>
> 请问该如何指定分隔符为“||” ?
>
>
> -- 原始邮件 --
> *发件人:* "Dian Fu" ;
> *发送时间:* 2020年12月17日(星期四) 下午3:12
> *收件人:* "user-zh";
> *抄送:* "消息室";
> *主题:* Re: 求教:pyflink的sink是否支持redis connector?
>
> 嗯,需要打成Jar包,才可以在PyFlink里用:
> 1)需要创建一个fat jar,把依赖都shade到jar里面。现在默认的不是fat jar,需要修改一下pom文件,可以参考Kafka里的做法
> [1]。
> 2)关于如何使用,可用的属性有这些 [2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-sql-connector-kafka/pom.xml#L46
> [2]
> https://github.com/apache/bahir-flink/blob/f0b3e1e04930b79b277cfc7ebe3552db246578e9/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
>
>
> 在 2020年12月17日,上午11:55,magichuang  写道:
>
> hi,
>
> 想问一下您这个
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
> 可以打包成jar包嘛,然后在pyflink里用
>
> 对java不熟悉,我看这个页面里面只是对java和scala说了如何用
>
>
>
>
>
>
>
> Best,
>
> MagicHuang
>
>
>
>
> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2020-12-17 10:16:13
> 收 件 人:user-zh ,hepei...@qq.com
> 抄 送:
> 主 题:Re: 求教:pyflink的sink是否支持redis connector?
>
> 感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>
> redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
>
> 关于如何在PyFlink中使用connector,可以参考文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>
> 在 2020年12月17日,上午9:52,Xingbo Huang 写道:
>
> Hi,
>
> 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
> connector,关于如何自定义connector,你可以参考文档[2]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
>
> Best,
> Xingbo
>
>
> 消息室 于2020年12月17日周四 上午9:33写道:
>
> 您好:
>
>
> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
> connector?感谢!
>  如不支持,有何建议方式?
>
>
>
>
>
>
>
>


Re:pyflink1.11 从Mysql读取Decimal类型数据,数据精度损失问题

2020-12-29 文章 肖越
问题已解决 ~ 数据定义的问题
















在 2020-12-30 13:41:16,"肖越" <18242988...@163.com> 写道:

通过connector的方式定义数据:
数据类型定义为:yldrate DECIMAL, pf__id VARCHAR(10), symbol_id VARCHAR(30)
数据库中yldrate数据前几条是这样的:
0.72101337
0.
0.
0.000212493881
0.78719845
0.73023505
0.70173309
0.70168385
但是,pyflink读取出来转为pandas后都是:
 yldrate  
0   0  
1   0  
2   0  
3   0  
4   0  
5   0  
不知道是什么原因,导致精度损失,我要如何设置,使数据能够无损失读取?小白在线蹲大佬回复~拜托啦!







 

pyflink1.11 从Mysql读取Decimal类型数据,数据精度损失问题

2020-12-29 文章 肖越
通过connector的方式定义数据:
数据类型定义为:yldrate DECIMAL, pf__id VARCHAR(10), symbol_id VARCHAR(30)
数据库中yldrate数据前几条是这样的:
0.72101337
0.
0.
0.000212493881
0.78719845
0.73023505
0.70173309
0.70168385
但是,pyflink读取出来转为pandas后都是:
 yldrate  
0   0  
1   0  
2   0  
3   0  
4   0  
5   0  
不知道是什么原因,导致精度损失,我要如何设置,使数据能够无损失读取?小白在线蹲大佬回复~拜托啦!



flink1.11 flinksql 滑动窗口问题反馈

2020-12-29 文章 fan_future

 
需求:每隔5分钟输出凌晨到当前的数据量

方案:使用滑动窗口,步长为5,通过where条件过滤出今天的数据进行count(1)

现象:00:05凌晨5分的时候窗口触发,发现累加值并不是今天的数据量,而是在当前窗口内的所有数据,不知道是不是bug,或者是我这边条件使用不正确,还麻烦社区帮忙解决下

理解:咨询了一下,好像这个where条件是在数据源端触发的,也就是数据进来的时候,符合条件的数据才流到窗口内,而不是在窗口内触发where条件过滤的,不知道有什么方法可以在窗口内触发filter不



--
Sent from: http://apache-flink.147419.n8.nabble.com/

pyflink ????????csv??????????||??

2020-12-29 文章 ??????
format='csv'"||"??



??
Caused by: org.apache.flink.table.api.ValidationException: Option 
'csv.field-delimiter' must be a string with single character, but was: \|\|




||?? ?





----
??: 
   "Dian Fu"

https://github.com/apache/flink/blob/master/flink-connectors/flink-sql-connector-kafka/pom.xml#L46
[2] 
https://github.com/apache/bahir-flink/blob/f0b3e1e04930b79b277cfc7ebe3552db246578e9/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java



?? 2020??12??1711:55??magichuang https://github.com/apache/bahir-flink/tree/master/flink-connector-redis 
??jarpyflink

??java??java??scala??







Best??

MagicHuang




--  --
?? ?? "Dian Fu" https://github.com/apache/bahir-flink/tree/master/flink-connector-redis

??PyFlink??connectorhttps://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html

?? 2020??12??179:52??Xingbo Huang ??

Hi,

??flink??redis 
connector??[1]??redis
connectorconnector[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html

Best,
Xingbo


?? ??2020??12??17?? 9:33??

??


??pyflink??1.12.0??pyflink??sinkredis
connector???
  

Re: FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 文章 Robin Zhang
Hi,zilong 
确实是bug,跟我的使用方式一样。感谢!





zilong xiao wrote
> 没记错这是一个bug,计算列中含有关键字会异常,可以看下这个issue:
> https://issues.apache.org/jira/browse/FLINK-16068
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2020年12月29日周二 下午6:56写道:
> 
>> -- 定义时间非系统保留字为事件时间字段,能正常运行
>> create table events (
>> process_time  bigint  comment '事件时间',
>> event   string  comment '事件类型',
>> ts AS TO_TIMESTAMP(FROM_UNIXTIME(process_time/1000, '-MM-dd
>> HH:mm:ss')),
>> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>> ) with (
>>   ... ...
>> );
>>
>> 但是,定义的字段是系统保留字时,就会报错:
>> create table events (
>> `time`  bigint  comment '事件时间',
>>  eventstring  comment '事件类型',
>> ts AS TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000, '-MM-dd
>> HH:mm:ss')),
>> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>> ) with (
>>   ... ...
>> );
>>
>> 但现在问题是:神策埋点的事件时间字段是time,如果单独写一个程序转换字段的话,显得有些鸡肋。
>> 不知道是不是bug,目前还没想到较好的解决方案。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11.2 替代MapReduce执行 Hive 语句 Full Join 非常慢

2020-12-29 文章 Jacob
Dear All,

近期在research Hive on Flink的一些特性,之前有一个Mapreduce的任务,语句【HQL】如下:

逻辑简单,全连接两个表(table1和table2),将结果写进新表table3。table1、table2和table3三张表结构相同,都有35个字段,join的时候用id作为连接,进行比较,如果table2的字段值不为null,或者不为空,就用table2的字段,反之用table1字段。最后把结果写进新表table3。


到目前为止,已经执行了17h,还没有结束,看数据流,好像快完了,不知道我的使用方法是否合理?

【HQL 】
insert into table3 select
if(t2.id is not null and t2.id <> '', t2.id, t1.id) as id
,if(t2.field2 is not null and t2.field2 <> '', t2.field2, t1.field2) as
field2
..
..
..
..
..
..
..
..
,if(t2.field35 is not null and field35.dt <> '', field35.dt , field35.dt )
as field35
from (
select * from table1 where (id is not null and id <> '')
) as t1 full join (
select * from table2 where (id is not null and id <> '')
) as t2 on (t1.id = t2.id)




代码如下:



public class FlinkHiveIntegration1 {

public static void main(String[] args) throws Exception {

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
  
String name = "myhive";
String database = "mydatabase";
String version = "1.1.0-cdh5.8.3";

HiveConf hiveConf = new HiveConf();

hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://***:9083");

hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"hdfs://nameservice1/user/hive/warehouse");


HiveCatalog hive = new HiveCatalogTest(name, database, hiveConf ,
version);

tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.useDatabase(database);
 
String HQL = HQL ;

tableEnv.getConfig().addConfiguration(new
Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 8));

tableEnv.executeSql(HQL );

}
}








 



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql流平台工具

2020-12-29 文章 Leonard Xu
感谢分享!
看起来很nice的平台化实践,star 了.

> 在 2020年12月29日,21:09,zhp <499348...@qq.com> 写道:
> 
> 本人业余时间开发了一个基于flink-sql 的web 可视化ui平台供大家学习、参考、使用 
> https://github.com/zhp8341/flink-streaming-platform-web
>   
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink1.12支持hbase1.2吗

2020-12-29 文章 Leonard Xu
Hi,
hbase-1.2社区没测试过,社区支持的是hbase-1.4.x 和 hbase-2.3.x,
你可以用hbase-1.4.x的connector试下,connector使用到Hbase的API不多,1.4.x 和 1.2.x 应该是兼容的

祝好,
Leonard
> 在 2020年12月29日,21:12,zhp <499348...@qq.com> 写道:
> 
> flink1.12支持hbase1.2吗
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

2020-12-29 文章 咿咿呀呀
您好,这个问题是不是使用错误导致的呀



--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????????flink job ????????kafka source function ????????offset

2020-12-29 文章 jasonwu
??
checkpoint disableauto.commit 
??interval??offsets, ??flink job sink 
??offset?? 
flinkKafkaConsumer??
  ?? 


53??new kafkaConsumer??commitSync()??

KafkaConsumer

flink1.12支持hbase1.2吗

2020-12-29 文章 zhp
flink1.12支持hbase1.2吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


????????csv??????????||??

2020-12-29 文章 ??????
format='csv'"||"??



??
Caused by: org.apache.flink.table.api.ValidationException: Option 
'csv.field-delimiter' must be a string with single character, but was: \|\|




||?? ?

flink-sql流平台工具

2020-12-29 文章 zhp
本人业余时间开发了一个基于flink-sql 的web 可视化ui平台供大家学习、参考、使用 
https://github.com/zhp8341/flink-streaming-platform-web
  



--
Sent from: http://apache-flink.147419.n8.nabble.com/


how to achive exactly once with flink sql when consume from kafka and then write to mysql

2020-12-29 文章 huliancheng
we are going to build our data computing system based on flink sql.
for now, with flink 1.11.0, we had achived a milestone: consuming from
kafka, then select from dynamic table, and write results to mysql.

but, when we test the exactly once(end to end), we found problem.

below are our sourcecode, shell and sql files:


sql file:
-- source, 使用计算列,uuid()在线生成uuid
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3),
uuid as uuid()
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'test-consumer-group12',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);

-- sink
CREATE TABLE pvuv_sink (
uuid varchar,
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink_test',
'connector.table' = 'pvuv_sink13',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1',
'connector.sink.semantic' = 'exactly-once'
);


INSERT INTO pvuv_sink
SELECT
  uuid,
  DATE_FORMAT(ts, '-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, '-MM-dd HH:00'), uuid;


sql parse and concat file:
/**
 * 这是进行命令解析和提交的程序,整个工程入口
 */
public class SqlSubmit {

public static void main(String[] args) throws Exception {
// 解析命令行参数
final CliOptions options = CliOptionsParser.parseClient(args);

// 将解析好的命令行参数传递给SqlSubmit
SqlSubmit submit = new SqlSubmit(options);

// 运行程序
submit.run();
}

//

private String sqlFilePath;
private TableEnvironment tEnv;

// 获取到sql执行文件的路径
private SqlSubmit(CliOptions options) {
this.sqlFilePath = options.getSqlFilePath();
}

private void run() throws Exception {
// 创建flink执行的上下文对象
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
this.tEnv = StreamTableEnvironment.create(environment,
   
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build());

// 获取所有的sql文件行内容,转为字符串list
List sql = Files.readAllLines(Paths.get(sqlFilePath));
List calls =
SqlCommandParser.parse(sql);
if (calls.size() == 0) {
//no sql to execute
throw new RuntimeException("There is no sql statement to
execute,please check your sql file: " + sqlFilePath);
}
for (SqlCommandParser.SqlCommandCall call : calls) {
//System.out.println(call.command.toString());
callCommand(call);
}
}

//


private void callCommand(SqlCommandParser.SqlCommandCall cmdCall) {
switch (cmdCall.command) {
case SET:
callSet(cmdCall);
break;
case CREATE_TABLE:
callCreateTable(cmdCall);
break;
case INSERT_INTO:
callInsertInto(cmdCall);
break;
default:
throw new RuntimeException("Unsupported command: " +
cmdCall.command);
}
}

private void callSet(SqlCommandParser.SqlCommandCall cmdCall) {
String key = cmdCall.operands[0];
String value = cmdCall.operands[1];
tEnv.getConfig().getConfiguration().setString(key, value);
System.out.println("设置 " + key + "-->" + value + " 成功");
}

private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) {
String ddl = cmdCall.operands[0];
try {
tEnv.executeSql(ddl);
} catch (SqlParserException e) {
throw new RuntimeException("SQL parse failed:\n" + ddl + "\n",
e);
}
String tableName = ddl.split("\\s+")[2];
System.out.println("创建表 " + tableName + " 成功");
}

private void callInsertInto(SqlCommandParser.SqlCommandCall cmdCall) {
String dml = cmdCall.operands[0];
Optional jobClient;
try {
TableResult result = tEnv.executeSql(dml);
jobClient = result.getJobClient();
} catch (SqlParserException e) {
throw new RuntimeException("SQL parse failed:\n" + dml + "\n",
e);
}

if (jobClient.isPresent()) {
JobID jobID = 

?????? flink 1.12 Cancel Job??????????(??)

2020-12-29 文章 ??????
??StandaloneSessionCluster




----
??: 
   "user-zh"



?????? flink 1.12 Cancel Job??????????(??)

2020-12-29 文章 ??????
IDEA??flink 
sql??job??cancel??job??Checkpoint??Cancel??TaskManager??Solt



----
??: "??"

Re: flink 1.12 Cancel Job内存未释放(问)

2020-12-29 文章 赵一旦
不可以吧。任务是任务。taskManager是taskManager。  taskManager是提前启动好的一个进程,任务提交的时候会由
taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
或者考虑yarn方式,per-job模式啥的。

徐州州 <25977...@qq.com> 于2020年12月29日周二 上午9:00写道:

> 请教一下,我flink
> sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?


Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

2020-12-29 文章 huliancheng
we are going to build our data computing system base on flink sql.
for now, with flink 1.11.0, we had achived a milestone: consuming from
kafka, then select from dynamic table, and write results to mysql.

but, when we test the exactly once(end to end), we found problem.

official documentation about flink sql do ous no favor. i need your help


sql file:
-- source, 使用计算列,uuid()在线生成uuid
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3),
uuid as uuid()
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'test-consumer-group12',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);

-- sink
CREATE TABLE pvuv_sink (
uuid varchar,
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink_test',
'connector.table' = 'pvuv_sink13',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1',
'connector.sink.semantic' = 'exactly-once'
);


INSERT INTO pvuv_sink
SELECT
  uuid,
  DATE_FORMAT(ts, '-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, '-MM-dd HH:00'), uuid;


sql parse and concat file:
/**
 * 这是进行命令解析和提交的程序,整个工程入口
 */
public class SqlSubmit {

public static void main(String[] args) throws Exception {
// 解析命令行参数
final CliOptions options = CliOptionsParser.parseClient(args);

// 将解析好的命令行参数传递给SqlSubmit
SqlSubmit submit = new SqlSubmit(options);

// 运行程序
submit.run();
}

//

private String sqlFilePath;
private TableEnvironment tEnv;

// 获取到sql执行文件的路径
private SqlSubmit(CliOptions options) {
this.sqlFilePath = options.getSqlFilePath();
}

private void run() throws Exception {
// 创建flink执行的上下文对象
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
this.tEnv = StreamTableEnvironment.create(environment,
   
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build());

// 获取所有的sql文件行内容,转为字符串list
List sql = Files.readAllLines(Paths.get(sqlFilePath));
List calls =
SqlCommandParser.parse(sql);
if (calls.size() == 0) {
//no sql to execute
throw new RuntimeException("There is no sql statement to
execute,please check your sql file: " + sqlFilePath);
}
for (SqlCommandParser.SqlCommandCall call : calls) {
//System.out.println(call.command.toString());
callCommand(call);
}
}

//


private void callCommand(SqlCommandParser.SqlCommandCall cmdCall) {
switch (cmdCall.command) {
case SET:
callSet(cmdCall);
break;
case CREATE_TABLE:
callCreateTable(cmdCall);
break;
case INSERT_INTO:
callInsertInto(cmdCall);
break;
default:
throw new RuntimeException("Unsupported command: " +
cmdCall.command);
}
}

private void callSet(SqlCommandParser.SqlCommandCall cmdCall) {
String key = cmdCall.operands[0];
String value = cmdCall.operands[1];
tEnv.getConfig().getConfiguration().setString(key, value);
System.out.println("设置 " + key + "-->" + value + " 成功");
}

private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) {
String ddl = cmdCall.operands[0];
try {
tEnv.executeSql(ddl);
} catch (SqlParserException e) {
throw new RuntimeException("SQL parse failed:\n" + ddl + "\n",
e);
}
String tableName = ddl.split("\\s+")[2];
System.out.println("创建表 " + tableName + " 成功");
}

private void callInsertInto(SqlCommandParser.SqlCommandCall cmdCall) {
String dml = cmdCall.operands[0];
Optional jobClient;
try {
TableResult result = tEnv.executeSql(dml);
jobClient = result.getJobClient();
} catch (SqlParserException e) {
throw new RuntimeException("SQL parse failed:\n" + dml + "\n",
e);
}

if (jobClient.isPresent()) {
   

Yarn资源本地化的地方报权限错误

2020-12-29 文章 Qishang
HI.
环境 :2.6.0+cdh5.15.2+ islon
FlinkX (基于Flink 1.8.1) 提交任务报错。这个问题卡了好长时间了。提交任务的地方Kerberos是正常通过的,
Yarn资源本地化的地方报权限错误,很不理解,各位大佬能不能帮忙提供一点排除思路。
1. Flinkx的任务是正常提交的;
2. 还有一个测试环境也是CDH + Kerberos , Flinkx 提交也是正常的;
3. 升级到FlinkX 1.10.1 + Flink 1.10.1 也是同样的问题。

提交命令 :
/opt/flinkx/bin/flinkx -mode yarnPer  -job
/tmp/flink_tmp_json/mysql2mysql.json  -pluginRoot /opt/flinkx/plugins
 -flinkconf /opt/flink-1.8.1/conf  -flinkLibJar /opt/flink-1.8.1/lib
 -yarnconf /etc/hadoop/conf  -queue root.liu  -jobid 10698

报错如下:
13:51:45.298 [main] INFO com.dtstack.flinkx.launcher.perjob.PerJobSubmitter
- start to submit per-job task, LauncherOptions =
com.dtstack.flinkx.options.Options@32eebfca
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
13:51:45.530 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.rpc.address, localhost
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.rpc.port, 6124
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.heap.mb, 1024
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.web.port, 8082
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.web.checkpoints.disable, true
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.archive.fs.dir,
hdfs://xxx:8020/data/flink/flink-completed-jobs
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: taskmanager.heap.mb, 2048
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: taskmanager.numberOfTaskSlots, 4
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: taskmanager.memory.preallocate, false
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: io.tmp.dirs, /tmp/flink/taskmanager
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: parallelism.default, 2
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: yarn.per-job-cluster.include-user-jar,
last
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: akka.lookup.timeout, 30 s
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: web.checkpoints.history, 30
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: fs.hdfs.hadoopconf, /etc/hadoop/conf
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.backend, filesystem
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.backend.fs.checkpointdir,
hdfs://xxx:8020/data/flink/flink-checkpoints
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.checkpoints.dir,
hdfs://xxx:8020/data/flink/flink-checkpoints
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.checkpoints.num-retained, 5
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.savepoints.dir,
hdfs://xxx:8020/data/flink/flink-savepoints
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: historyserver.archive.fs.dir,
hdfs://xxx:8020/data/flink/flink-completed-jobs
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: historyserver.web.port, 16899
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: yarn.application-attempts, 10
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: restart-strategy.fixed-delay.attempts,
1
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: restart-strategy.fixed-delay.delay, 30s
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: security.kerberos.login.contexts, Client
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: security.kerberos.login.keytab,
/root/keytab/hive.keytab
13:51:45.532 [main] 

Re: FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 文章 zilong xiao
没记错这是一个bug,计算列中含有关键字会异常,可以看下这个issue:
https://issues.apache.org/jira/browse/FLINK-16068

Robin Zhang  于2020年12月29日周二 下午6:56写道:

> -- 定义时间非系统保留字为事件时间字段,能正常运行
> create table events (
> process_time  bigint  comment '事件时间',
> event   string  comment '事件类型',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(process_time/1000, '-MM-dd
> HH:mm:ss')),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) with (
>   ... ...
> );
>
> 但是,定义的字段是系统保留字时,就会报错:
> create table events (
> `time`  bigint  comment '事件时间',
>  eventstring  comment '事件类型',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000, '-MM-dd HH:mm:ss')),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) with (
>   ... ...
> );
>
> 但现在问题是:神策埋点的事件时间字段是time,如果单独写一个程序转换字段的话,显得有些鸡肋。
> 不知道是不是bug,目前还没想到较好的解决方案。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


hive读取复杂数据类型写入mysql报错

2020-12-29 文章 zhang hao
请教个问题 hive读取复杂类型,mysql支持,有什么方法把数据当初字符串去存储到 mysql中:
org.apache.flink.table.api.ValidationException: Field types of query
result and registered TableSink
default_catalog.default_database.0_es_hive_094679318_tmp do not match.

Query schema: [id: INT, name: VARCHAR(2147483647), hobby: ARRAY,
add: MAP]
Sink schema: [id: INT, name: VARCHAR(2147483647), hobby:
VARCHAR(2147483647), add: VARCHAR(2147483647)]
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)


FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 文章 Robin Zhang
-- 定义时间非系统保留字为事件时间字段,能正常运行
create table events (
process_time  bigint  comment '事件时间',
event   string  comment '事件类型',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(process_time/1000, '-MM-dd
HH:mm:ss')),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) with (
  ... ...
);

但是,定义的字段是系统保留字时,就会报错:
create table events (
`time`  bigint  comment '事件时间',
 eventstring  comment '事件类型',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000, '-MM-dd HH:mm:ss')),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) with (
  ... ...
);

但现在问题是:神策埋点的事件时间字段是time,如果单独写一个程序转换字段的话,显得有些鸡肋。
不知道是不是bug,目前还没想到较好的解决方案。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: jdbc sink无法插入数据

2020-12-29 文章 guoliubi...@foxmail.com
原因找到了,因为在JdbcSink.sink里使用了默认的JdbcExecutionOptions,里面默认的batchSize是5000,就是说数据量到这个数了再批量入库。
因为这个任务数据量少,很久才能凑到5000条,导致看起来数据没有入库的假象。



guoliubi...@foxmail.com
 
发件人: guoliubi...@foxmail.com
发送时间: 2020-12-21 09:05
收件人: user-zh
主题: Re: Re: jdbc sink无法插入数据
确实可行,多谢指点。
 
 
 
guoliubi...@foxmail.com
发件人: 赵一旦
发送时间: 2020-12-20 23:24
收件人: user-zh
主题: Re: jdbc sink无法插入数据
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:
sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
如上,针对sideStream直接添加2个sink即可。
r pp  于2020年12月19日周六 下午12:15写道:
> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> guoliubi...@foxmail.com  于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> > .process(new ProcessFunction() {
> > @Override
> > public void processElement(RatioValuevalue, Context ctx,
> > Collector out) throws Exception {
> > out.collect(value);
> > ctx.output(ratioOutputTag, value);
> > }
> > });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> > "ratio_value",
> > new RatioValueSerializationSchema(suffix),
> > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > guoliubi...@foxmail.com
> >
>


Re:flink12同时使用flink-connector-kafka和flink-sql-connector-kafka会引发类冲突问题

2020-12-29 文章 felixzh

org.apache.flink
flink-connector-kafka-2.2_2.11
1.12.0provided















在 2020-12-26 10:37:52,"site"  写道:
>在yarn中部署streaming程序,maven使用依赖
>
>org.apache.flink
>flink-connector-kafka-2.2_2.11
>1.12.0
>
>在flink的lib目录中有flink-sql-connector-kafka_2.11-1.12.0.jar,因为类冲突问题会引起在yarn中程序部署失败,flink默认类加载机制child-first,改为parent-first也样,类冲突问题可以参考http://apache-flink.147419.n8.nabble.com/Kafka-Consumer-td3475.html
>org.apache.kafka.common.KafkaException: 
>org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> is not an instance of org.apache.kafka.common.serialization.Deserializer
>
>
>不会提ISSUE,还请社区修复


Re:flink1.10.2 读取mysql并打印数据,本地可以执行,yarn上报错,什么原因呢

2020-12-29 文章 felixzh



拒绝连接: emr-worker-4.cluster/:40751
主机名不识别?














At 2020-12-29 13:51:11, "xufengfeng" <503814...@qq.com> wrote:
>Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
>Could not complete the operation. Number of retries has been exhausted.
>   at
>org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
>   at
>java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>   at
>java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>   at
>java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>   at
>java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>   at
>org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:323)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>   at java.lang.Thread.run(Thread.java:748)
>Caused by: java.util.concurrent.CompletionException:
>org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>拒绝连接: emr-worker-4.cluster/:40751
>   at
>java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>   at
>java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>   at
>java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
>   at
>java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>   ... 19 more
>Caused by:
>org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>拒绝连接: emr-worker-4.cluster-1/xxx:40751
>Caused by: java.net.ConnectException: 拒绝连接
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>   at
>org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>   at
>org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>   at java.lang.Thread.run(Thread.java:748)
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/