回复:flinksql有计划支持mysql catalog吗?

2021-10-11 Thread Roc Marshal
旭晨,你好。
目前这个feature已经在工作中。
欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962

祝好。
Roc.



发自 网易邮箱大师




 回复的原邮件 
| 发件人 | 赵旭晨 |
| 日期 | 2021年10月12日 10:17 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | flinksql有计划支持mysql catalog吗? |
目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?

Re:Re: Flink Sql 1.13 UDF ERROR

2021-07-11 Thread Roc Marshal
Hi, Jingsong.


 最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。
 在之前提到的例子中使用基本类型做UDF参数, 
表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。







祝好。

Best Roc.








在 2021-06-29 11:02:55,"Jingsong Li"  写道:
>Hi,
>
>你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题
>
>Best,
>Jingsong
>
>On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal  wrote:
>
>>
>>
>> Hi, All.
>>
>>
>> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>>
>>
>> 版本: 1.13.1
>> 运行模式: IDE-application
>> ---
>> about udf define...
>>
>>
>> public static class UDFAggregateFunction extends
>> AggregateFunction {
>>
>>
>> //返回最终结果
>> @Override
>> public Double getValue(AccumulatorBean acc) {
>> return acc.totalPrice / acc.totalNum;
>> }
>>
>>
>> //构建保存中间结果的对象
>> @Override
>> public AccumulatorBean createAccumulator() {
>> return new AccumulatorBean();
>> }
>>
>>
>> //减去要撤回的值
>> public void retract(AccumulatorBean acc, double price, long num) {
>> acc.totalPrice -= price * num;
>> acc.totalNum -= num;
>> }
>>
>>
>> //从每个分区把数据取出来然后合并
>> public void merge(AccumulatorBean acc, Iterable
>> it) {
>>
>>
>> Iterator iter = it.iterator();
>> while (iter.hasNext()) {
>> AccumulatorBean a = iter.next();
>> this.accumulate(acc, a.totalPrice, a.totalNum);
>> }
>> }
>>
>>
>> //重置内存中值时调用
>> public void resetAccumulator(AccumulatorBean acc) {
>> acc.totalNum = 0;
>> acc.totalPrice = 0;
>> }
>>
>>
>> //和传入数据进行计算的逻辑
>> public void accumulate(AccumulatorBean acc, double price, long
>> num) {
>> acc.totalPrice += price * num;
>> acc.totalNum += num;
>> }
>> }
>>
>>
>>
>> 
>> About main calling
>> //TODO 流批一体的 Table API
>> TableEnvironment tableEnvironment =
>> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
>> List dataList = new ArrayList<>();
>> dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
>> dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
>> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
>> Table table = tableEnvironment.fromValues(DataTypes.ROW(
>> DataTypes.FIELD("user", DataTypes.STRING()),
>> DataTypes.FIELD("name", DataTypes.STRING()),
>> DataTypes.FIELD("price", DataTypes.DOUBLE()),
>> DataTypes.FIELD("num", DataTypes.BIGINT())
>> ),
>> dataList);
>> tableEnvironment.createTemporaryView("orders", table);
>>
>>
>> tableEnvironment.createTemporaryFunction("c_agg", new
>> UDFAggregateFunction());
>>
>>
>> tableEnvironment.executeSql("select user, c_agg(price, num) as
>> udf_field from orders group by user").print();
>>
>>
>>
>>
>>
>>
>>
>> 异常堆栈-
>>
>>
>>
>>
>> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
>> at
>> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
>> Caused by: org.apache.flink.table.api.ValidationException: Invalid
>> function call:
>> default_c

Re:关于任务运行一定时间后,physical内存超出,container被kill,导致任务重启

2021-07-07 Thread Roc Marshal
Hi, 
可以先校对一下yarn的container的虚拟内存和物理内存比例的阈值参数(yarn-site.xml)。


   

祝好,Roc.














在 2021-07-08 10:44:20,"黄志高"  写道:
>flink环境1.11.0
>任务部署方式yarn per-job
>状态后台设置的是:env.setStateBackend(new FsStateBackend("ckPath"))
>每个taskManager分配8g内存,2个slot
>每10分钟做一次checkpoint,每次ck大小平均400k
>任务逻辑是:source(kafka)->keyBy->timeWindow->reduce的count计数->redis
> source(kafka)->sink(s3 文件)
>
>
>问题是任务每天都会应该container被杀,导致任务重启
>Container [pid=26148,containerID=container_e02_1622516404559_0038_01_08] 
>is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB 
>physical memory used; 9.8 GB of 40 GB virtual memory used. Killing container
>
>
>我的理解是缓存数据应该不会那么多,怎么就能达到物理内存限制呢,我的window操作,理应都是key下对应一个值,key的数据也不多,缓存应该也只记录这个状态,而且window采用的是reduce操作,来一条处理一条,增量处理,而不是processFunction的攒一批处理一次
>望各位大佬帮忙看看,感谢
>
>
>


Flink SQL MYSQL schema 特性问题

2021-07-07 Thread Roc Marshal
Hi, 
   请问目前的 Flink SQL 在创建source表的时候支持自动拉取所有的表列信息并解析吗?


   谢谢。


Best, Roc.

Flink Sql 1.13 UDF ERROR

2021-06-28 Thread Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 

Flink Sql 1.13 UDF ERROR

2021-06-28 Thread Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 

Re:退订

2021-05-26 Thread Roc Marshal
张斌,你好:如需退订,请回复信息到 user-zh-unsubscr...@flink.apache.org 
后,根据提示完成后续流程,即可退订。祝好。Best, flinker.
在 2021-05-26 17:05:59,"张斌"  写道:
>
>
>退订
>| |
>张斌
>|
>|
>herobin1...@163.com
>|
>签名由网易邮箱大师定制
>


Re:Re: About Memory Spilling to Disk in Flink

2021-03-24 Thread Roc Marshal
Hi, Guowei Ma.
As far as I know, flink writes some in-memory data to disk when memory is 
running low. I noticed that flink uses ExternalSorterBuilder for batch 
operations in the org.apache.flink.runtime.operator.sort package, but I'm 
curious to confirm if this technique is also used in stream mode.
Thank you.


Best, 
Roc













At 2021-03-24 15:01:48, "Guowei Ma"  wrote:

Hi, Roc
Could you explain more about your question? 

Best,
Guowei




On Wed, Mar 24, 2021 at 2:47 PM Roc Marshal  wrote:

Hi, 


Can someone tell me where flink uses memory spilling to write to disk? 
Thank you.


Best, Roc.




 

About Memory Spilling to Disk in Flink

2021-03-24 Thread Roc Marshal
Hi, 


Can someone tell me where flink uses memory spilling to write to disk? 
Thank you.


Best, Roc.

回复:退订

2021-01-27 Thread Roc Marshal
Hi, Tang.
Please send short message to user-zh-unsubscr...@flink.apache.org if you want 
to unsubscribe the mail.


Best, Roc.


| |
Roc Marshal
|
|
flin...@126.com
|
签名由网易邮箱大师定制


在2021年01月27日 16:41,唐军亮 写道:
退订

FlinkSQL 窗口使用问题

2020-10-21 Thread Roc Marshal
Hi,




SELECT

TUMBLE_START(ts, INTERVAL '1' day) as window_start,

TUMBLE_END(ts, INTERVAL '1' day) as window_end,

c1,

sum(c2) as sc2

FROM sourcetable

GROUP BY TUMBLE(ts, INTERVAL '1' day), c1

ORDER BY window_start, sc2 desc limit 10


这个sql希望能够以一天为窗口(翻滚)进行计算窗口  
按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。
能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪?


谢谢!


Best Roc.

Re:Re:StreamSQL 进行窗口计算的疑问

2020-10-21 Thread Roc Marshal
SELECT

TUMBLE_START(ts, INTERVAL '1' day) as window_start,

TUMBLE_END(ts, INTERVAL '1' day) as window_end,

c1,

sum(c2) as sc2

FROM target_kafka_source_converted

GROUP BY TUMBLE(ts, INTERVAL '1' day), c1

ORDER BY window_start, sc2 desc limit 10


我的这个sql,希望能够以一天为窗口进行计算窗口  
按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。
能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪?


谢谢您!


Best Roc.



















在 2020-10-21 17:21:47,"hailongwang" <18868816...@163.com> 写道:
>Hi Roc,
>  目前 SQL 不支持指定 offset,只能1天的窗口,从0点开始。
>目前有一个 Issue 在跟进这个问题:
>https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17767?filter=allopenissues
>
>
>Best,
>Hailong Wang
>
>在 2020-10-21 16:09:29,"Roc Marshal"  写道:
>>Hi,
>>
>>
>>如果进行滚动窗口(窗口长度为一天)取某列的聚合值,如何在SQL中指定窗口的开始滚动的时间呢?比如,希望窗口从每天的凌晨两点(作为窗口起始时间点)到第二天凌晨两点(作为窗口结束时间点)。这种语法怎么使用呢?
>>
>>
>>谢谢。
>>
>>
>>Best Roc
>>


StreamSQL 进行窗口计算的疑问

2020-10-21 Thread Roc Marshal
Hi,


如果进行滚动窗口(窗口长度为一天)取某列的聚合值,如何在SQL中指定窗口的开始滚动的时间呢?比如,希望窗口从每天的凌晨两点(作为窗口起始时间点)到第二天凌晨两点(作为窗口结束时间点)。这种语法怎么使用呢?


谢谢。


Best Roc



Re:Re: flink sql ddl 是否支持映射多层json

2020-10-21 Thread Roc Marshal
如果是深度是三层以上也是类似的嵌套语法吗?或者说是其他的写法?


谢谢

Best Roc.





在 2020-09-24 20:53:12,"Benchao Li"  写道:
>这个情况现在是支持的,可以用类似于这种写法:
>```SQL
>CREATE TABLE MyTable (
>  a11 INT,
>  a12 VARCHAR,
>  a13 ROW
>) WITH (...)
>```
>
>Roc Marshal  于2020年9月24日周四 下午7:54写道:
>
>> 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
>> {
>> "a11":1,
>> "a12":"1",
>> "a13":{
>> "a21":1,
>> "a22":1,
>> "a23":"1"}
>> }
>>
>>
>> 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?
>>
>>
>> 谢谢
>
>
>
>-- 
>
>Best,
>Benchao Li


Question about Flink-SQL

2020-10-19 Thread Roc Marshal
Hello,


Does Flink-SQL support fetching Mysql meta information automaticly  in the 
latest version, ? If not, could the you adding this feature ?
Thank you.


Best, Roc.

flink sql ddl 是否支持映射多层json

2020-09-24 Thread Roc Marshal
请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
{
"a11":1,
"a12":"1",
"a13":{
"a21":1,
"a22":1,
"a23":"1"}
}


比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?


谢谢

flink-OOME_Java heap space

2020-08-06 Thread Roc Marshal
Hi, all.
请教如下问题。
情景:jdk-oracle-1.8, flink-realse-1.10.0. 
flink-on-yarn的session模式。数据读取kafka.进行sql运算。
JVM Heap Size:638 MB
Flink Managed Memory:635 MB,出现异常如下。
  statebackend为filesystem->hadoop
   任务直接从deploying->feailed.
 其他参考信息如下图片。
 可以给些建议吗?
 谢谢。





Flink-1.10 on yarn Taskmanager启动参数问题

2020-07-27 Thread Roc Marshal
Hi, all.


 请问Flink-1.10 on yarn Taskmanager启动的jvm GC 回收器参数默认信息是G1吗?
 基本集群环境:hadoop-2.7.5、flink-1.10、jdk-1.8_61,其中jvm相关参数均未进行显示设置。
 




谢谢。






Best,
Roc Marshal.

Re:Flink 1.11 submit job timed out

2020-07-15 Thread Roc Marshal
Hi,SmileSmile.
个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
希望这对你有帮助。


祝好。
Roc Marshal











在 2020-07-15 17:04:18,"SmileSmile"  写道:
>
>Hi
>
>使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
>并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
>out,作业提交失败。web ui也会卡主无响应。
>
>用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
>
>
>部分日志如下:
>
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.32.160.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.44.224.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,461 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.40.32.9, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
>heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
>Disconnect job manager 
>0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
> for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.
>
>
>how to deal with ?
>
>
>beset !
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re:【求助】Flink Hadoop依赖问题

2020-07-15 Thread Roc Marshal



你好,Z-Z,

可以尝试在 
https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/ 
下载对应的uber jar包,并就将下载后的jar文件放到flink镜像的 ${FLINK_HOME}/lib 路径下,之后启动编排的容器。
祝好。
Roc Marshal.











在 2020-07-15 10:47:39,"Z-Z"  写道:
>我在使用Flink 1.11.0版本中,使用docker-compose搭建,docker-compose文件如下:
>version: "2.1"
>services:
> jobmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6123"
>  ports:
>   - "8081:8081"
>  command: jobmanager
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>   - 
>HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
>  volumes:
>   - ./jobmanager/conf:/opt/flink/conf
>   - ./data:/data
>
>
> taskmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6121"
>   - "6122"
>  depends_on:
>   - jobmanager
>  command: taskmanager
>  links:
>   - "jobmanager:jobmanager"
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>  volumes:
>   - ./taskmanager/conf:/opt/flink/conf
>networks:
> default:
>  external:
>   name: flink-network
>
>
>
>hadoop-2.9.2已经放在data目录了,且已经在jobmanager和taskmanager的环境变量里添加了HADOOP_CLASSPATH,但通过cli提交和webui提交,jobmanager还是提示报Could
> not find a file system implementation for scheme 'hdfs'。有谁知道是怎么回事吗?


Re:【Flink的transformations】

2020-06-29 Thread Roc Marshal
忝忝向仧,你好。
目前Flink文档层面没有类似的映射表归档。
但是在API层面可以观察到返回信息。


Best,
Roc Marshal



在 2020-06-29 22:29:21,"忝忝向仧" <153488...@qq.com> 写道:
>Hi,all:
>
>
>请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类.
>
>
>谢谢.


Re:flink1.9 on yarn

2020-06-27 Thread Roc Marshal
Hi, guanyq.

关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。



关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
我是否可以理解为,flink 
yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。
可参考: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-flink-session
Best,
Roc Marshal

在 2020-06-28 09:09:43,"guanyq"  写道:
>问题1
>
>./bin/flink run -m 
>yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
>当yarn application -kill application_1567067657620_0254后,
>
>在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
>问题2
>
>./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
> 


Re:Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 Thread Roc Marshal
是的。


Best,
Roc Marshal.

















在 2020-06-28 10:10:20,"林恬"  写道:
>您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么?
>
>
>
>
>
>
>
>--Original--
>From: "Roc Marshal"Date: Sun, Jun 28, 2020 10:07 AM
>To: "FLINK中国"
>Subject: Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
>
>
>
>Hi, 林恬.
>首先,感谢你的反馈。
>关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink 
>job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。
>
>
>
>
>Best,
>Roc Marshal.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-28 09:12:41,"林恬" 各位好:
>nbsp; nbsp; 目前我使用的是Flink 1.9.2, HA使用ZK, 
>使用过程中发现ZK上的/leader/${job_id} 
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
>nbsp;


Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 Thread Roc Marshal
Hi, 林恬.
首先,感谢你的反馈。
关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink 
job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。




Best,
Roc Marshal.

















在 2020-06-28 09:12:41,"林恬"  写道:
>各位好:
>  目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
>


Re:为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 Thread Roc Marshal
Hi, 立志。
能不能提供一下更多的信息,比如异常信息等,方便对这个case背景做更进一步的了解呢?


谢谢。


Best,
Roc Marshal














在 2020-06-28 09:52:10,"张立志"  写道:
>flink 版本1.8
>部署集群yarn
>
>
>配置代码:
>StreamExecutionEnvironment.stateBackend(new 
>FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
>业务代码相对比较简单,内存占用较大
>超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
>
>
>
>
>


Re:??????flinksql????hbase??????????

2020-06-22 Thread Roc Marshal
MuChen1.??Hbase??zk??"org.apache.flink.shaded.curator.org.apache.curator.ConnectionStatenbsp;
 - Authentication failed JobManager Web Interface: 
http://uhadoop-op3raf-core24:42976 "2.Hbase"Caused by: 
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] ; 
SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70,
 source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, 
fields=[key]) ; SinkConversionToTuple2 ; Sink: SQL Client Stream Collect Sink': 
Configuring the input format (null) failed: Cannot create connection to 
HBase."??HBASEHbaseHbase.????????????Best,Roc
 Marshal.
?? 2020-06-23 11:05:43??"MuChen" <9329...@qq.com> ??
>Hi,Roc Marshal:
>
>
>
>Best,
>MuChen.
>
>
>
>
>----
>??:"Roc Marshal":2020??6??23??(??) 10:27
>??:"user-zh"
>:Re:flinksqlhbase??
>
>
>
>MuChen 
>Sourcezk Marshal.
>?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com ??
>Hi, All:
>
>
>??flinksqlhbase
>
>
>
>
>
>
>hadoop??masterflink??
>
>yarn-session:
>bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 
>2gt;amp;1 amp; # ?? 
>[admin@uhadoop-op3raf-master2 flink10]$ 2020-06-23 09:30:56,402 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - 
>Authentication failed 2020-06-23 09:30:56,515 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - 
>Authentication failed JobManager Web Interface: 
>http://uhadoop-op3raf-core24:42976 
>sql-client:
>bin/sql-client.sh embedded 
>hbaseflinksql??
># CREATE TABLE hbase_video_pic_title_q70 ( key 
>string, cf1 ROW( 'connector.type' = 'hbase', 'connector.version' = 
>'1.4.3', 'connector.table-name' = 
>'hbase_video_pic_title_q70', 'connector.zookeeper.quorum' = 
>'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181',
> 'connector.zookeeper.znode.parent' = '/hbase', 
>'connector.write.buffer-flush.max-size' = '10mb', 
>'connector.write.buffer-flush.max-rows' = '1000', 
>'connector.write.buffer-flush.interval' = '2s' ); 
>??
>select key from hbase_video_pic_title_q70; 
>??HBase
>[ERROR] Could not execute SQL statement. Reason: 
>org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
>error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
>job. at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> at 
>java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at 
>java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at 
>java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
>akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at 
>akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
>akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
>Caused by: java.lang.RuntimeException: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: 
>Could not set up JobManager at 
>org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at 
>org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
&

Re:flinksql????hbase??????????

2020-06-22 Thread Roc Marshal
MuChen??HBase??zk??meta??Flink??Hbase
 
Sourcezk??Best,Roc
 Marshal.
?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com> ??
>Hi, All:
>
>
>??flinksqlhbase
>
>
>
>
>
>
>hadoop??masterflink??
>
>yarn-session:
>bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 
> # ?? [admin@uhadoop-op3raf-master2 
>flink10]$ 2020-06-23 09:30:56,402 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
>Authentication failed 2020-06-23 09:30:56,515 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
>Authentication failed JobManager Web Interface: 
>http://uhadoop-op3raf-core24:42976 
>sql-client:
>bin/sql-client.sh embedded 
>hbaseflinksql??
>#  CREATE TABLE hbase_video_pic_title_q70 (   key string,   cf1 ROWstring, q70 string ) WITH (   'connector.type' = 'hbase',   
>'connector.version' = '1.4.3',   'connector.table-name' = 
>'hbase_video_pic_title_q70',   'connector.zookeeper.quorum' = 
>'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181',
>   'connector.zookeeper.znode.parent' = '/hbase',   
>'connector.write.buffer-flush.max-size' = '10mb',   
>'connector.write.buffer-flush.max-rows' = '1000',
>'connector.write.buffer-flush.interval' = '2s' ); 
>??
>select key from hbase_video_pic_title_q70; 
>??HBase
>[ERROR] Could not execute SQL statement. Reason: 
>org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
>error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.  
>   at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> at 
>java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)   
>  at 
>java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at 
>java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)  
>   at 
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
>   at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)  
>   at 
>akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
>Caused by: java.lang.RuntimeException: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 6 more Caused by: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at 
>org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more Caused by: 
>org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
>'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] - 
>SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70,
> source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, 
>fields=[key]) - SinkConversionToTuple2 - Sink: SQL Client Stream 
>Collect Sink': Configuring the input format (null) failed: Cannot create 
>connection to HBase. at 
>org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.

the savepoint problem of upgrading job from blink-1.5 to flink-1.10

2020-05-15 Thread Roc Marshal
Hi, all.


When using savepoint to upgrade a Flink job from blink-1.5 to flink-1.10, 
the system prompts that blink savepointV3 is incompatible with the version in 
Flink. Is there any solution? 


Thank you so much.
















Sincerely,
Roc Marshal