Re: Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 wch...@163.com
在使用java时,
 StreamExecutionEnvironment 使用 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment这个完全限定名的类,
 你这里应该是导入了org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 



wch...@163.com
 
发件人: Natasha
发送时间: 2020-10-30 10:19
收件人: user-zh@flink.apache.org
主题: Flink程序连接Kafka类型不匹配问题
Hi,社区~

我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激。
Best,
Nat
--> 


Re: TUMBLE函数不支持 回撤流

2020-10-29 文章 admin
Hi,
能贴一下完整的sql吗,数据源是CDC的数据吗?

> 2020年10月30日 下午2:48,夜思流年梦  写道:
> 
> 开发者你好:
> 现有此场景:
> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> select 
> 
>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> 
>> ,sum(amt) as paymoney_h  
> 
>> from 
> 
>> group by TUMBLE(write_time,interval '1' HOUR);
> 
> 
> 报错:
> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
> support consuming update and delete changes which is produced by node 
> TableSourceScan
> 
> 
> 
> 
> 发现把kafka建表语句改成 json格式就可以
> 
> 
> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> 
> 
> 
> 
> 
> 
> 
> 
> 



Re: flink任务挂掉后自动重启

2020-10-29 文章 bradyMk
谢谢您的解答~
重启策略确实可以解决任务故障重启,但是有的时候(例如集群资源不够),任务会直接被kill掉,我想问的是针对被kill掉的任务,有没有什么方法可以自动重启,特别是带有ck的任务,如果想要自动重启,如何在启动的时候自动去hdfs上获取最新的ck地址呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复: flink实时流中如何实时获取当前时间

2020-10-29 文章 zjfpla...@hotmail.com
谢谢 我试试



zjfpla...@hotmail.com
 
发件人: 史 正超
发送时间: 2020-10-30 14:48
收件人: user-zh@flink.apache.org
主题: 回复: flink实时流中如何实时获取当前时间
在source表上加上 proctime AS PROCTIME()的字段 , 
下游取的时候就用proctime转,注意时差的问题。比如用FROM_UNIXTIME(CAST(proctime AS BIGINT), 
'-MM-dd HH:mm:ss') 函数时,在table config里设置时区
``` java
 
streamTableEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
 
``

发件人: zjfpla...@hotmail.com 
发送时间: 2020年10月30日 3:17
收件人: user-zh 
主题: flink实时流中如何实时获取当前时间
 
RT,sql方式中怎么实时取当前消息处理的时间,来插入数据库当作当前更新时间?现在用current_timestamp发现不会变,只是第一次的时间
 
 
 
zjfpla...@hotmail.com


回复: flink实时流中如何实时获取当前时间

2020-10-29 文章 史 正超
在source表上加上 proctime AS PROCTIME()的字段 , 
下游取的时候就用proctime转,注意时差的问题。比如用FROM_UNIXTIME(CAST(proctime AS BIGINT), 
'-MM-dd HH:mm:ss') 函数时,在table config里设置时区
``` java

streamTableEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));

``

发件人: zjfpla...@hotmail.com 
发送时间: 2020年10月30日 3:17
收件人: user-zh 
主题: flink实时流中如何实时获取当前时间

RT,sql方式中怎么实时取当前消息处理的时间,来插入数据库当作当前更新时间?现在用current_timestamp发现不会变,只是第一次的时间



zjfpla...@hotmail.com


TUMBLE函数不支持 回撤流

2020-10-29 文章 夜思流年梦
开发者你好:
现有此场景:
求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
select 

> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime

> ,sum(amt) as paymoney_h  

> from 

> group by TUMBLE(write_time,interval '1' HOUR);


报错:
org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support 
consuming update and delete changes which is produced by node TableSourceScan




发现把kafka建表语句改成 json格式就可以


现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊











Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 admin
你是用的Filesystem 
connector读写hdfs的吗?数据序列化和反序列化的时间也有差异,而且source和sink的并发度也有很大差异,为了控制小文件数量,减少了sink的并发度,那写入速度肯定也是有限的。
由于source和sink的并发已经确定了,中间不管哪个阶段进行shuffle,其实对首尾的处理速度应该影响不大。
以上是个人愚见,欢迎大佬指正。

> 2020年10月30日 下午2:30,Husky Zeng <568793...@qq.com> 写道:
> 
> 我们的场景是这样的:
> 
> 从hive读数据,计算后写回hive。
> 
> 从hive读数据,为了加快速度,使用了650个并发subTask。
> 
> 向hive写数据,为了减少小文件,需要控制并发subTask数量。
> 
> 因此需要找一个环节进行shuffle。
> 
> 所以有上面的疑问。
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 Husky Zeng
我们的场景是这样的:

从hive读数据,计算后写回hive。

从hive读数据,为了加快速度,使用了650个并发subTask。

向hive写数据,为了减少小文件,需要控制并发subTask数量。

因此需要找一个环节进行shuffle。

所以有上面的疑问。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink任务挂掉后自动重启

2020-10-29 文章 Congxian Qiu
Hi
1 Flink 的 RestartStrategy[1] 可以解决你的问题吗?
2 从 checkpoint 恢复 这个,可以尝试记录每个作业最新的 checkpoint 地址,也可以在启动的时候从 hdfs 获取一下

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/task_failure_recovery.html
Best,
Congxian


bradyMk  于2020年10月30日周五 上午11:51写道:

>
> flink任务一般都是7*24h在跑的,如果挂掉,有没有什么办法自动重启任务?之前都是任务挂掉然后手动再提交一次任务,但是不可能每次挂掉都可以手动重启;另外,如果对于没做checkpoints的任务,可以通过定时脚本监控yarn,如果任务不存在,则重新提交任务,但是,对于做了checkpoints的任务,我们提交的时候就需要指定ck的目录,这个目录都是在变的,那么又该如何让任务挂掉后能自动重启呢?希望能得到大佬们的指点~
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink cep超时事件的问题

2020-10-29 文章 sunfulin



hi,
session window能处理这种超时事件么?不知道有没有例子可以参考参考哈。














在 2020-10-30 11:12:55,"naisili Yuan"  写道:
> 不知道理解错没有, 感觉你这个场景使用session windows能解决
>
>sunfulin  于2020年10月30日周五 上午11:01写道:
>
>> hi,community,
>> 我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink
>> cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
>> 目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
>> 感谢。


flink任务挂掉后自动重启

2020-10-29 文章 bradyMk
flink任务一般都是7*24h在跑的,如果挂掉,有没有什么办法自动重启任务?之前都是任务挂掉然后手动再提交一次任务,但是不可能每次挂掉都可以手动重启;另外,如果对于没做checkpoints的任务,可以通过定时脚本监控yarn,如果任务不存在,则重新提交任务,但是,对于做了checkpoints的任务,我们提交的时候就需要指定ck的目录,这个目录都是在变的,那么又该如何让任务挂掉后能自动重启呢?希望能得到大佬们的指点~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

minibatch+????ttl??????????????????????

2020-10-29 文章 ????????
hi, all !
flink??1.9??
??select userId,sum(money) as result,ymd from (
select userId,order_id,money,DATE_FORMAT(trans_time,'MMdd') as 
ymd,row_number() over(partition by order_id order by last_modify_time desc) as 
rk from MyTable where type='1'
) t where t.rk = 1 group by userId,ymd;
??tableConfig.setIdleStateRetentionTime(Time.milliseconds(360), 
Time.milliseconds(39)); --1??
??checkpointttl
jira??1??


??1??https://issues.apache.org/jira/browse/FLINK-17096

Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 Husky Zeng
我把operator chain和streaming
dataflow的概念弄混了,不好意思。我想表达的是在整个任务流程中,选择shuffle的位置对于性能的影响。 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 Natasha
Hi Admin,
你说的没错,我错误地import了scala的DataStream,问题已解决!感谢!






Best,
Nat
在2020年10月30日 11:06,admin<17626017...@163.com> 写道:
Hi,
怀疑你import了scala的包,把import部分也贴出来看看呢

2020年10月30日 上午10:19,Natasha <13631230...@163.com> 写道:

Hi,社区~
我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激。

Best,
Nat



flink实时流中如何实时获取当前时间

2020-10-29 文章 zjfpla...@hotmail.com
RT,sql方式中怎么实时取当前消息处理的时间,来插入数据库当作当前更新时间?现在用current_timestamp发现不会变,只是第一次的时间



zjfpla...@hotmail.com


Re: flink cep超时事件的问题

2020-10-29 文章 naisili Yuan
 不知道理解错没有, 感觉你这个场景使用session windows能解决

sunfulin  于2020年10月30日周五 上午11:01写道:

> hi,community,
> 我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink
> cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
> 目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
> 感谢。


Re: Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 admin
Hi,
怀疑你import了scala的包,把import部分也贴出来看看呢

> 2020年10月30日 上午10:19,Natasha <13631230...@163.com> 写道:
> 
> Hi,社区~
> 
> 我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激。
> 
> Best,
> Nat



flink cep超时事件的问题

2020-10-29 文章 sunfulin
hi,community,
我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink 
cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
感谢。

Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 admin
HI,
operator chain的作用不就是避免shuffle,减少网络间的传输吗?你为什么要手动shuffle呢?

> 2020年10月30日 上午10:24,Husky Zeng <568793...@qq.com> 写道:
> 
> 补充一个细节:
> 
> 
> 当我把shuffle加到cal和sort中间时,
> 
> source-->cal-- (rebalance)->sort--->SinkConversionToRow--->sink
> 
> shuffle的数据传输IO速度是3G/s,需要传输的文件大小是370G。
> 
> 当我把shuffle加到SinkConversionToRow和sink中间时,
> 
> source-->cal-- ->sort--->SinkConversionToRow--(rebalance)-->sink
> 
> shuffle的数据传输IO速度是0.1G/s,需要传输的文件大小是250G。
> 
> 
> 文件大小也是有区别的。
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 Husky Zeng
补充一个细节:


当我把shuffle加到cal和sort中间时,

source-->cal-- (rebalance)->sort--->SinkConversionToRow--->sink

shuffle的数据传输IO速度是3G/s,需要传输的文件大小是370G。

当我把shuffle加到SinkConversionToRow和sink中间时,

source-->cal-- ->sort--->SinkConversionToRow--(rebalance)-->sink

shuffle的数据传输IO速度是0.1G/s,需要传输的文件大小是250G。


文件大小也是有区别的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 Husky Zeng
Hi all,

在使用flink的shuffle功能时,我发现在operator chain中不同的位置进行shuffle,IO速度有非常明显的差距。

比如我的这个例子:

source-->cal--->sort--->SinkConversionToRow--->sink

从hive读数据,计算,排序,转化为外部类型行,写入hive。

当我把shuffle加到cal和sort中间时,

source-->cal-- (rebalance)->sort--->SinkConversionToRow--->sink

shuffle的数据传输IO速度是3G/s

当我把shuffle加到SinkConversionToRow和sink中间时,

source-->cal-- ->sort--->SinkConversionToRow--(rebalance)-->sink

shuffle的数据传输IO速度是0.1G/s


足足差了30倍!


我猜测这是由于SinkConversionToRow将数据转化为了外部格式,外部格式传输速度慢,内部格式传输速度快。

但是为什么差距这么大?  内部格式如何做到传输速度这么快,外部格式又为什么传输速度这么慢?

SinkConversionToRow代码位置:
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink#translateToTransformation



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: sql-client 连接hive报错 TTransportException

2020-10-29 文章 RS
Hi, 

谢谢,应该是HMS的问题, 原来是需要配置remote的HMS,之前都是local模式
我执行了一下流程:
1. 清理了旧的数据库和数据目录
2. 重新初始化 schematool -dbType mysql -initSchema
3. 启动hive --service metastore, 成功监听端口9083端口
4. 启动hiveserver2, hiveserver2一直在重试,没有监听1端口


然后hiveserver2启动失败, hive版本3.1.2, 请问下这个问题如何解决呢?


2020-10-29T18:53:35,602  WARN [main] server.HiveServer2: Error starting 
HiveServer2 on attempt 1, will retry in 6ms
java.lang.RuntimeException: Error initializing notification event poll
at org.apache.hive.service.server.HiveServer2.init(HiveServer2.java:275) 
~[hive-service-3.1.2.jar:3.1.2]
at 
org.apache.hive.service.server.HiveServer2.startHiveServer2(HiveServer2.java:1036)
 [hive-service-3.1.
2.jar:3.1.2]
at 
org.apache.hive.service.server.HiveServer2.access$1600(HiveServer2.java:140) 
[hive-service-3.1.2.jar:
3.1.2]
at 
org.apache.hive.service.server.HiveServer2$StartOptionExecutor.execute(HiveServer2.java:1305)
 [hive-s
ervice-3.1.2.jar:3.1.2]
at 
org.apache.hive.service.server.HiveServer2.main(HiveServer2.java:1149) 
[hive-service-3.1.2.jar:3.1.2]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_261]at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_261]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_261]at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_261]
at org.apache.hadoop.util.RunJar.run(RunJar.java:323) 
[hadoop-common-3.3.0.jar:?]at 
org.apache.hadoop.util.RunJar.main(RunJar.java:236) [hadoop-common-3.3.0.jar:?]
Caused by: java.io.IOException: org.apache.thrift.TApplicationException: 
Internal error processing get_current_notificationEventId
at 
org.apache.hadoop.hive.metastore.messaging.EventUtils$MSClientNotificationFetcher.getCurrentNotificat
ionEventId(EventUtils.java:75) ~[hive-exec-3.1.2.jar:3.1.2]
at 
org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll.(NotificationEventPoll.java:103
) ~[hive-exec-3.1.2.jar:3.1.2]
at 
org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll.initialize(NotificationEventPoll.java
:59) ~[hive-exec-3.1.2.jar:3.1.2]
at 
org.apache.hive.service.server.HiveServer2.init(HiveServer2.java:273) 
~[hive-service-3.1.2.jar:3.1.2]
... 10 more
Caused by: org.apache.thrift.TApplicationException: Internal error processing 
get_current_notificationEventIdat 
org.apache.thrift.TApplicationException.read(TApplicationException.java:111) 
~[hive-exec-3.1.2.jar:3.
1.2]at 
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79) 
~[hive-exec-3.1.2.jar:3.1.2]
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_current_notificationEventId(
ThriftHiveMetastore.java:5575) ~[hive-exec-3.1.2.jar:3.1.2]
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_current_notificationEventId(Thrif
tHiveMetastore.java:5563) ~[hive-exec-3.1.2.jar:3.1.2]
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getCurrentNotificationEventId(HiveMetaStoreClient.java:2723)
 ~[hive-exec-3.1.2.jar:3.1.2]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_261]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_261]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_261]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_261]
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:212)
 ~[h








在 2020-10-27 19:58:32,"Rui Li"  写道:
>你好,我看log里连接的是1端口,这个是HS2的端口吧?Flink的HiveCatalog需要连接的是HMS,可以启动一个HMS再试试哈。
>
>On Tue, Oct 27, 2020 at 9:57 AM RS  wrote:
>
>> Hi, 请教下
>> 我尝试使用sql-client连接hive,  hive正常, 使用beeline -u jdbc:hive2://x.x.x.x:1
>> 可以正常连接
>>
>>
>> sql-client-defaults.yaml配置内容:
>> tables: []
>> functions: []
>> catalogs:
>> - name: myhive
>>   type: hive
>>   hive-conf-dir: /home/hive/flink-1.11.1/conf
>>   default-database: default
>> execution:
>>   planner: blink
>>   type: streaming
>>   time-characteristic: event-time
>>   periodic-watermarks-interval: 200
>>   result-mode: table
>>   max-table-result-rows: 100
>>   parallelism: 1
>>   max-parallelism: 128
>>   min-idle-state-retention: 0
>>   max-idle-state-retention: 0
>>   restart-strategy:
>> type: fallback
>> deployment:
>>   response-timeout: 5000
>>   gateway-address: ""
>>   gateway-port: 0
>>
>>
>> 然后启动sql-client报错
>> $./bin/sql-client.sh embedded
>>
>>
>> 最后的报错信息:
>> Exception in thread "main"
>> org.apache.flink.table.client.SqlClientException: Unexpected exception.
>> This is a bug. Please consider filing an issue.
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
>> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
>> Could not create ex

回复:Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 Natasha
hi,社区~

我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激





谢谢!
在2020年10月29日 19:29,hailongwang<18868816...@163.com> 写道:
Hi Natasha,
没看到你上传的附件图呢,重新贴下不?


Best,
Hailong Wang




在 2020-10-29 16:52:00,"Natasha" <13631230...@163.com> 写道:



hi,社区~
我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题(附件图),这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激


谢谢!

Re:Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 hailongwang
Hi Natasha,
没看到你上传的附件图呢,重新贴下不?


Best,
Hailong Wang




在 2020-10-29 16:52:00,"Natasha" <13631230...@163.com> 写道:



hi,社区~

我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题(附件图),这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激


谢谢!

??????JM??????????????????

2020-10-29 文章 void
hi all
      flinkdatasetapi 
??10jm??, 
jmcli ?? rest 


Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 Natasha


hi,社区~

我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题(附件图),这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激


谢谢!

Re: Checkpoint size的问题

2020-10-29 文章 Yun Tang
Hi

web UI显示的是增量上传数据量,包括各个task上传的数据,而_metadata 
只是一个元数据,是由JM上传的,所以不能将_metadata与checkpoint UI显示的数据量划等号。

祝好
唐云

From: gsralex 
Sent: Wednesday, October 28, 2020 19:17
To: user-zh@flink.apache.org 
Subject: Checkpoint size的问题

Hi, All
Checkpoint 一般Web UI显示的是400MB左右,但是查看HDFS实际的大小,不到1MB(_metadata) 
,想问下这之间size的偏差为什么这么大?


Re: JDBC 并发写入量大时挂掉

2020-10-29 文章 Jark Wu
LEFT JOIN 是会有 delete (retraction)发生的。

On Thu, 29 Oct 2020 at 16:36, LittleFall <1578166...@qq.com> wrote:

> 操作中没有 DELETE 语句也会导致这个问题吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: JDBC 并发写入量大时挂掉

2020-10-29 文章 LittleFall
操作中没有 DELETE 语句也会导致这个问题吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: JDBC 并发写入量大时挂掉

2020-10-29 文章 Jark Wu
看起来是这个bug,已经在1.11.3上修复,你可以自己 build 下 release-1.11 分支。
https://issues.apache.org/jira/browse/FLINK-19423

Best,
Jark

On Thu, 29 Oct 2020 at 16:18, LittleFall <1578166...@qq.com> wrote:

> 测试发了10个线程,每个线程1000次,一共1万条记录
>
> 会在写入几千条的时候挂掉
>
> 2020-10-29 12:04:55,573 WARN  org.apache.flink.runtime.taskmanager.Task
>
> [] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID,
> PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE
> RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0,
> CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY,
> CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID,
> PAYER_CUSTOMER_REQUEST_NO, OPE  RATOR_NAME, CARD_HOLDER_NAME, ID1,
> CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO,
> MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id,
> ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date,
> PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS
> pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS
> external_no, STA  TUS0 AS pay_status, STATUS AS order_status,
> PAY_AMOUNT
> AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way,
> GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS
> productdesc,
> GOODS_DESC AS goods_des  c, CUSTOMER_BIZ_REQUEST_NO AS
> customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo,
> EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code,
> CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato  r,
> CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE
> AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS
> payerupdatetime,
> CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS
>
> create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS
> coupon_amount]) -> Sink:
> Sink(table=[default_catalog.default_database.wide_table_1], fields=[id,
> op_id, order_date, complete_date, payer_customer_request_no, pay_t
> ime,
> customer_request_no, external_no, pay_status, order_status, pay_amount,
> payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc,
> customer_biz_request_no, goods_ext_info, memo, extend_info,
> channel_ret_code, c  hannel_ret_msg, operator, customer_no,
> customer_name, extend, payercreatetime, payerupdatetime, card_no,
> card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1)
> (14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING   to FAILED.
> java.io.IOException: Writing records to JDBC failed.
> at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at StreamExecCalc$147.processElement(Unknown Source) ~[?:?]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>   

JDBC 并发写入量大时挂掉

2020-10-29 文章 LittleFall
测试发了10个线程,每个线程1000次,一共1万条记录

会在写入几千条的时候挂掉

2020-10-29 12:04:55,573 WARN  org.apache.flink.runtime.taskmanager.Task 
  
[] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID,
PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE 
RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0,
CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY,
CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID,
PAYER_CUSTOMER_REQUEST_NO, OPE  RATOR_NAME, CARD_HOLDER_NAME, ID1,
CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO,
MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id,  
ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date,
PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS
pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS
external_no, STA  TUS0 AS pay_status, STATUS AS order_status, PAY_AMOUNT
AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way,
GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS productdesc,
GOODS_DESC AS goods_des  c, CUSTOMER_BIZ_REQUEST_NO AS
customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo,
EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code,
CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato  r,
CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE
AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS payerupdatetime,
CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS  
create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS
coupon_amount]) -> Sink:
Sink(table=[default_catalog.default_database.wide_table_1], fields=[id,
op_id, order_date, complete_date, payer_customer_request_no, pay_t  ime,
customer_request_no, external_no, pay_status, order_status, pay_amount,
payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc,
customer_biz_request_no, goods_ext_info, memo, extend_info,
channel_ret_code, c  hannel_ret_msg, operator, customer_no,
customer_name, extend, payercreatetime, payerupdatetime, card_no,
card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1)
(14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING   to FAILED.
java.io.IOException: Writing records to JDBC failed.
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at StreamExecCalc$147.processElement(Unknown Source) ~[?:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.output(StreamingJoinOperator.java:305)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.run

Re: flink1.11 elasticsearch connector

2020-10-29 文章 Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 29, 2020 at 3:37 PM 赵帅  wrote:
>
> elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql api如何加入账号认证?


Flink消费LDAP Kafka

2020-10-29 文章 hua mulan
Flink kafka connector可以消费开了LDAP的Kafka吗

来自 Outlook


flink1.11 elasticsearch connector

2020-10-29 文章 赵帅
elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql api如何加入账号认证?