Re: flink row 类型

2020-07-22 文章 Dream-底限
hi、xiao cai

可以说一下思路吗,我没太懂
》》可以考虑把字段索引值保存下来再获取

Dream-底限  于2020年7月23日周四 下午2:56写道:

> hi、Jingsong Li
> 我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称
>
> >>在TypeInference中有input的type,这个type应该是包含字段信息的。
>
> xiao cai  于2020年7月23日周四 下午2:19写道:
>
>> 可以考虑把字段索引值保存下来再获取
>>
>>
>>  原始邮件
>> 发件人: Dream-底限
>> 收件人: user-zh
>> 发送时间: 2020年7月23日(周四) 14:08
>> 主题: Re: flink row 类型
>>
>>
>> hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li 
>> 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 >
>> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 <
>> zhan...@akulaku.com> 于2020年7月22日周三 下午7:22写道: > > > hi、 > >
>> 我这面定义row数据,类型为ROW,可以通过 > >
>> row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > >
>> rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao
>> Li >
>
>


Re: flink row 类型

2020-07-22 文章 Dream-底限
hi、Jingsong Li
我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称

>>在TypeInference中有input的type,这个type应该是包含字段信息的。

xiao cai  于2020年7月23日周四 下午2:19写道:

> 可以考虑把字段索引值保存下来再获取
>
>
>  原始邮件
> 发件人: Dream-底限
> 收件人: user-zh
> 发送时间: 2020年7月23日(周四) 14:08
> 主题: Re: flink row 类型
>
>
> hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li 
> 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 >
> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 <
> zhan...@akulaku.com> 于2020年7月22日周三 下午7:22写道: > > > hi、 > >
> 我这面定义row数据,类型为ROW,可以通过 > >
> row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > >
> rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao
> Li >


Re: Flink 1.11 submit job timed out

2020-07-22 文章 SmileSmile
Hi Yang Wang

先分享下我这边的环境版本


kubernetes:1.17.4.   CNI: weave  


1 2 3 是我的一些疑惑

4 是JM日志


1. 去掉taskmanager-query-state-service.yaml后确实不行  nslookup

kubectl exec -it busybox2 -- /bin/sh
/ # nslookup 10.47.96.2
Server:  10.96.0.10
Address: 10.96.0.10:53

** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN



2. Flink1.11和Flink1.10

detail subtasks taskmanagers xxx x 这行  
1.11变成了172-20-0-50。1.10是flink-taskmanager-7b5d6958b6-sfzlk:36459。这块的改动是?(目前这个集群跑着1.10和1.11,1.10可以正常运行,如果coredns有问题,1.10版本的flink应该也有一样的情况吧?)

3. coredns是否特殊配置?

在容器中解析域名是正常的,只是反向解析没有service才会有问题。coredns是否有什么需要配置?


4. time out时候的JM日志如下:



2020-07-23 13:53:00,228 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
ResourceManager 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 was granted 
leadership with fencing token 
2020-07-23 13:53:00,232 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
2020-07-23 13:53:00,233 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
Starting the SlotManager.
2020-07-23 13:53:03,472 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering TaskManager with ResourceID 1f9ae0cd95a28943a73be26323588696 
(akka.tcp://flink@10.34.128.9:6122/user/rpc/taskmanager_0) at ResourceManager
2020-07-23 13:53:03,777 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering TaskManager with ResourceID cac09e751264e61615329c20713a84b4 
(akka.tcp://flink@10.32.160.6:6122/user/rpc/taskmanager_0) at ResourceManager
2020-07-23 13:53:03,787 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering TaskManager with ResourceID 93c72d01d09f9ae427c5fc980ed4c1e4 
(akka.tcp://flink@10.39.0.8:6122/user/rpc/taskmanager_0) at ResourceManager
2020-07-23 13:53:04,044 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering TaskManager with ResourceID 8adf2f8e81b77a16d5418a9e252c61e2 
(akka.tcp://flink@10.38.64.7:6122/user/rpc/taskmanager_0) at ResourceManager
2020-07-23 13:53:04,099 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering TaskManager with ResourceID 23e9d2358f6eb76b9ae718d879d4f330 
(akka.tcp://flink@10.42.160.6:6122/user/rpc/taskmanager_0) at ResourceManager
2020-07-23 13:53:04,146 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering TaskManager with ResourceID 092f8dee299e32df13db3111662b61f8 
(akka.tcp://flink@10.33.192.14:6122/user/rpc/taskmanager_0) at ResourceManager


2020-07-23 13:55:44,220 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
JobGraph submission 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,222 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting 
job 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,251 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .
2020-07-23 13:55:44,260 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Initializing job JobTest (99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Using restart back off time strategy 
NoRestartBackoffTimeStrategy for JobTest (99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Running initialization on master for job JobTest 
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Successfully ran initialization on master in 0 ms.
2020-07-23 13:55:44,428 INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 pipelined regions in 25 ms
2020-07-23 13:55:44,437 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Loading state backend via factory 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2020-07-23 13:55:44,456 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2020-07-23 13:55:44,457 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using default 
options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-07-23 13:55:44,466 WARN  org.apache.flink.runtime.util.HadoopUtils 
   [] - Could not find Hadoop configuration via any of the supported 
methods (Flink configuration, environment variables).
2020-07-23 13:55:45,276 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Using failov

Re: Is it possible to do state migration with checkpoints?

2020-07-22 文章 Sivaprasanna
+user-zh@flink.apache.org 

A follow up question. I tried taking a savepoint but the job failed
immediately. It happens everytime I take a savepoint. The job is running on
a Yarn cluster so it fails with "container running out of memory". The
state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please
refer to the screenshot below). The job is running with 2GB task manager
heap & 2GB task manager managed memory. I increased the managed memory to
6GB assuming the failure has something to do with RocksDB but it failed
even with 6GB managed memory. I guess I am missing on some configurations.
Can you folks please help me with this?

[image: Screenshot 2020-07-23 at 10.34.29 AM.png]

On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna 
wrote:

> Hi,
>
> We are trying out state schema migration for one of our stateful
> pipelines. We use few Avro type states. Changes made to the job:
> 1. Updated the schema for one of the states (added a new 'boolean'
> field with default value).
> 2. Modified the code by removing a couple of ValueStates.
>
> To push these changes, I stopped the live job and resubmitted the new jar
> with the latest *checkpoint* path. However, the job failed with the
> following error:
>
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> ...
> ...
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>
> I was going through the state schema evolution doc. The document mentions
> that we need to take a *savepoint* and restart the job with the savepoint
> path. We are using RocksDB backend with incremental checkpoint enabled. Can
> we not use the latest checkpoint available when we are dealing with state
> schema changes?
>
> Complete stacktrace is attached with this mail.
>
> -
> Sivaprasanna
>


Re: flink row 类型

2020-07-22 文章 xiao cai
可以考虑把字段索引值保存下来再获取


 原始邮件 
发件人: Dream-底限
收件人: user-zh
发送时间: 2020年7月23日(周四) 14:08
主题: Re: flink row 类型


hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li  于2020年7月23日周四 
下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > 
你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 
 于2020年7月22日周三 下午7:22写道: > > > hi、 > > 
我这面定义row数据,类型为ROW,可以通过 > > 
row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > > rule_key 
转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao Li >

Re: flink row 类型

2020-07-22 文章 Jingsong Li
可以看下Flink 1.11的UDF type inference.

在TypeInference中有input的type,这个type应该是包含字段信息的。

Best,
Jingsong

On Thu, Jul 23, 2020 at 2:09 PM Dream-底限  wrote:

> hi
> 是的,我们的数据场景比较尴尬,那我想其他方式实现一下
>
> Benchao Li  于2020年7月23日周四 下午12:55写道:
>
> > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。
> > 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。
> >
> > Dream-底限  于2020年7月22日周三 下午7:22写道:
> >
> > > hi、
> > > 我这面定义row数据,类型为ROW,可以通过
> > > row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口
> > >
> > > rule_key  转换为rule_key1,rulekey2
> > > 1
> > > 2
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 
Best, Jingsong Lee


Re: flink row 类型

2020-07-22 文章 Dream-底限
hi
是的,我们的数据场景比较尴尬,那我想其他方式实现一下

Benchao Li  于2020年7月23日周四 下午12:55写道:

> 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。
> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。
>
> Dream-底限  于2020年7月22日周三 下午7:22写道:
>
> > hi、
> > 我这面定义row数据,类型为ROW,可以通过
> > row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口
> >
> > rule_key  转换为rule_key1,rulekey2
> > 1
> > 2
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink1.11 web ui没有DAG

2020-07-22 文章 Congxian Qiu
Hi
   你的图片我这边显示不出来,能否把图片放到某个图床,然后把链接发过来呢?这样大家能更好的查看图片
Best,
Congxian


小学生 <201782...@qq.com> 于2020年7月22日周三 下午3:49写道:

>
>
>
>


Re: flink stream如何为每条数据生成自增主键

2020-07-22 文章 Congxian Qiu
Hi
   你是希望每条数据有一个 id,这个 id 是随着数据递增的是啊?或许你可以使用 RichMapFunction[1] 来做这个事情,在每次
mapFunction 中把自定的 id 加进去,然后这个 id 还可以保存到 state 中,这样就算作业 failover 了,自增 id
也不会有问题。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/user_defined_functions.html#rich-functions
Best,
Congxian


Michael Ran  于2020年7月22日周三 下午4:17写道:

> id 生成器吧
> 在 2020-07-22 15:51:44,"tiantingting5...@163.com" 
> 写道:
> >
> >flink stream如何为每条数据生成自增主键??时间戳貌似不行,同一时间戳可能会产生多条数据,无法区分数据的现后顺序。
> >
> >
> >tiantingting5...@163.com
>


Re: Flink 1.11 submit job timed out

2020-07-22 文章 Yang Wang
很高兴你的问题解决了,但我觉得根本原因应该不是加上了taskmanager-query-state-service.yaml的关系。
我这边不创建这个服务也是正常的,而且nslookup {tm_ip_address}是可以正常反解析到hostname的。

注意这里不是解析hostname,而是通过ip地址来反解析进行验证


回答你说的两个问题:
1. 不是必须的,我这边验证不需要创建,集群也是可以正常运行任务的。Rest
service的暴露方式是ClusterIP、NodePort、LoadBalancer都正常
2. 如果没有配置taskmanager.bind-host,
[Flink-15911][Flink-15154]这两个JIRA并不会影响TM向RM注册时候的使用的地址

如果你想找到根本原因,那可能需要你这边提供JM/TM的完整log,这样方便分析


Best,
Yang

SmileSmile  于2020年7月23日周四 上午11:30写道:

>
> Hi Yang Wang
>
> 刚刚在测试环境测试了一下,taskManager没有办法nslookup出来,JM可以nslookup,这两者的差别在于是否有service。
>
> 解决方案:我这边给集群加上了taskmanager-query-state-service.yaml(按照官网上是可选服务)。就不会刷No
> hostname could be resolved for ip
> address,将NodePort改为ClusterIp,作业就可以成功提交,不会出现time out的问题了,问题得到了解决。
>
>
> 1. 如果按照上面的情况,那么这个配置文件是必须配置的?
>
> 2. 在1.11的更新中,发现有 [Flink-15911][Flink-15154]
> 支持分别配置用于本地监听绑定的网络接口和外部访问的地址和端口。是否是这块的改动,
> 需要JM去通过TM上报的ip反向解析出service?
>
>
> Bset!
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
>
> a511955993
> 邮箱:a511955...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> On 07/23/2020 10:11, Yang Wang  wrote:
> 我的意思就是你在Flink任务运行的过程中,然后下面的命令在集群里面起一个busybox的pod,
> 在里面执行 nslookup {ip_address},看看是否能够正常解析到。如果不能应该就是coredns的
> 问题了
>
> kubectl run -i -t busybox --image=busybox --restart=Never
>
> 你需要确认下集群的coredns pod是否正常,一般是部署在kube-system这个namespace下的
>
>
>
> Best,
> Yang
>
>
> SmileSmile  于2020年7月22日周三 下午7:57写道:
>
> >
> > Hi,Yang Wang!
> >
> > 很开心可以收到你的回复,你的回复帮助很大,让我知道了问题的方向。我再补充些信息,希望可以帮我进一步判断一下问题根源。
> >
> > 在JM报错的地方,No hostname could be resolved for ip address x
> > ,报出来的ip是k8s分配给flink pod的内网ip,不是宿主机的ip。请问这个问题可能出在哪里呢
> >
> > Best!
> >
> >
> > a511955993
> > 邮箱:a511955...@163.com
> >
> > <
> https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=a511955993&uid=a511955993%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Aa511955993%40163.com%22%5D>;
>
> >
> > 签名由 网易邮箱大师 ; 定制
> >
> > On 07/22/2020 18:18, Yang Wang  wrote:
> > 如果你的日志里面一直在刷No hostname could be resolved for the IP
> address,应该是集群的coredns
> > 有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
> > 可能是coredns有问题
> >
> >
> > Best,
> > Yang
> >
> > Congxian Qiu  于2020年7月21日周二 下午7:29写道:
> >
> > > Hi
> > >不确定 k8s 环境中能否看到 pod 的完整日志?类似 Yarn 的 NM 日志一样,如果有的话,可以尝试看一下这个 pod
> > > 的完整日志有没有什么发现
> > > Best,
> > > Congxian
> > >
> > >
> > > SmileSmile  于2020年7月21日周二 下午3:19写道:
> > >
> > > > Hi,Congxian
> > > >
> > > > 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be
> > > > resolved,jm失联,作业提交失败。
> > > > 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。
> > > >
> > > > 在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。
> > > >
> > > >
> > > > 是否有其他排查思路?
> > > >
> > > > Best!
> > > >
> > > >
> > > >
> > > >
> > > > | |
> > > > a511955993
> > > > |
> > > > |
> > > > 邮箱:a511955...@163.com
> > > > |
> > > >
> > > > 签名由 网易邮箱大师 定制
> > > >
> > > > On 07/16/2020 13:17, Congxian Qiu wrote:
> > > > Hi
> > > >   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk
> 的日志。之前遇到过一次在
> > > Yarn
> > > > 环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。
> > > >
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > SmileSmile  于2020年7月15日周三 下午5:20写道:
> > > >
> > > > > Hi Roc
> > > > >
> > > > > 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
> > > > >
> > > > >
> > > > >
> > > > > | |
> > > > > a511955993
> > > > > |
> > > > > |
> > > > > 邮箱:a511955...@163.com
> > > > > |
> > > > >
> > > > > 签名由 网易邮箱大师 定制
> > > > >
> > > > > On 07/15/2020 17:16, Roc Marshal wrote:
> > > > > Hi,SmileSmile.
> > > > > 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> > > > > 希望这对你有帮助。
> > > > >
> > > > >
> > > > > 祝好。
> > > > > Roc Marshal
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> > > > > >
> > > > > >Hi
> > > > > >
> > > > > >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> > > > > 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP
> > address,JM
> > > > time
> > > > > out,作业提交失败。web ui也会卡主无响应。
> > > > > >
> > > > > >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> > > > > >
> > > > > >
> > > > > >部分日志如下:
> > > > > >
> > > > > >2020-07-15 16:58:46,460 WARN
> > > > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] -
> No
> > > > > hostname could be resolved for the IP address 10.32.160.7, using
> IP
> > > > address
> > > > > as host name. Local input split assignment (such as for HDFS
> files)
> > may
> > > > be
> > > > > impacted.
> > > > > >2020-07-15 16:58:46,460 WARN
> > > > > org.apache.flink.runtime.taskmana

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-22 文章 Congxian Qiu
Hi Peihui
   不确定是什么原因我这边暂时没看到附件,我再私聊你要一下具体的 log 然后看看

Best,
Congxian


Peihui He  于2020年7月23日周四 上午8:57写道:

> Hi Congxian,
>
> 这个问题有结论没呢?
>
> Best wishes.
>
> Peihui He  于2020年7月17日周五 下午4:21写道:
>
>> Hi Congxian,
>>
>> [image: Snipaste_2020-07-17_16-20-06.png]
>>
>> 我这边通过chrome 浏览器看到是上传了的,并且可以下载的。
>>
>> Best wishes.
>>
>> Congxian Qiu  于2020年7月17日周五 下午1:31写道:
>>
>>> Hi  Peihui
>>>
>>> 感谢你的回复,我这边没有看到附件,你那边能否确认下呢?
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Peihui He  于2020年7月17日周五 上午10:13写道:
>>>
>>> > Hi Congxian
>>> >
>>> > 见附件。
>>> >
>>> > Best wishes.
>>> >
>>> > Congxian Qiu  于2020年7月16日周四 下午8:24写道:
>>> >
>>> >> Hi Peihui
>>> >>
>>> >> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
>>> >> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
>>> >> 非常感谢~
>>> >>
>>> >> [1] https://gist.github.com/
>>> >>
>>> >> Best,
>>> >> Congxian
>>> >>
>>> >>
>>> >> Peihui He  于2020年7月16日周四 下午5:54写道:
>>> >>
>>> >> > Hi Yun,
>>> >> >
>>> >> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
>>> >> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
>>> >> >
>>> >> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
>>> >> >
>>> >> > Peihui He  于2020年7月16日周四 下午5:26写道:
>>> >> >
>>> >> >> Hi Yun,
>>> >> >>
>>> >> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>>> >> >>
>>> >> >> Best wishes.
>>> >> >>
>>> >> >> Yun Tang  于2020年7月16日周四 下午5:04写道:
>>> >> >>
>>> >> >>> Hi Peihui
>>> >> >>>
>>> >> >>> Flink-1.10.1
>>> >> >>>
>>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>>> >> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>>> >> >>>
>>> >> >>>
>>> >> >>> [1]
>>> >> >>>
>>> >>
>>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>>> >> >>> 祝好
>>> >> >>> 唐云
>>> >> >>> 
>>> >> >>> From: Peihui He 
>>> >> >>> Sent: Thursday, July 16, 2020 16:15
>>> >> >>> To: user-zh@flink.apache.org 
>>> >> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>> >> >>>
>>> >> >>> Hi Yun,
>>> >> >>>
>>> >> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>>> >> >>> 输入的特定的word抛出runtimeexception 使task
>>> >> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>>> >> >>>
>>> >> >>> Caused by: java.nio.file.NoSuchFileException:
>>> >> >>>
>>> >> >>>
>>> >>
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>>> >> >>> ->
>>> >> >>>
>>> >>
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>> >> >>>
>>> >> >>> 情况和@chenxyz 类似。
>>> >> >>>
>>> >> >>>
>>> >>
>>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>>> >> >>>
>>> >> >>> 换成1.10.1 就可以了
>>> >> >>>
>>> >> >>> Best wishes.
>>> >> >>>
>>> >> >>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>>> >> >>>
>>> >> >>> > Hi Robin
>>> >> >>> >
>>> >> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>>> >> >>> >
>>> >> >>>
>>> >>
>>> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>>> >> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>>> >> >>> >
>>> >> >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
>>> >> >>> cause,还请在日志中找一下无法恢复的root
>>> >> >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>>> >> >>> >
>>> >> >>> >
>>> >> >>> > [1]
>>> >> >>> >
>>> >> >>>
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>>> >> >>> > [2]
>>> >> >>> >
>>> >> >>>
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>>> >> >>> >
>>> >> >>> > 祝好
>>> >> >>> > 唐云
>>> >> >>> >
>>> >> >>> >
>>> >> >>> > 
>>> >> >>> > From: Robin Zhang 
>>> >> >>> > Sent: Wednesday, July 15, 2020 16:23
>>> >> >>> > To: user-zh@flink.apache.org 
>>> >> >>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>> >> >>> >
>>> >> >>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>>> >> >>> >
>>> >> >>> > Best
>>> >> >>> > Robin Zhang
>>> >> >>> > 
>>> >> >>> > From: Peihui He <[hidden email]>
>>> >> >>> > Sent: Tuesday, July 14, 2020 10:42
>>> >> >>> > To: [hidden email] <[hidden email]>
>>> >> >>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>> >> >>> >
>>> >> >>> > hello,
>>> >> >>> >
>>> >> >>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>>> >> >>> >
>>> >> >>> >
>>> >> >>> > Caused by: java.nio.file.NoSuchFileException:
>>> >> >>> >
>>> >> >>> >
>>> >> >>>
>>> >>
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Congxian Qiu
Thanks Dian for the great work and thanks to everyone who makes this
release possible!

Best,
Congxian


Rui Li  于2020年7月23日周四 上午10:48写道:

> Thanks Dian for the great work!
>
> On Thu, Jul 23, 2020 at 10:22 AM Jingsong Li 
> wrote:
>
> > Thanks for being the release manager for the 1.11.1 release, Dian.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Jul 23, 2020 at 10:12 AM Zhijiang 
> > wrote:
> >
> >> Thanks for being the release manager and the efficient work, Dian!
> >>
> >> Best,
> >> Zhijiang
> >>
> >> --
> >> From:Konstantin Knauf 
> >> Send Time:2020年7月22日(星期三) 19:55
> >> To:Till Rohrmann 
> >> Cc:dev ; Yangze Guo ; Dian
> Fu <
> >> dia...@apache.org>; user ; user-zh <
> >> user-zh@flink.apache.org>
> >> Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released
> >>
> >> Thank you for managing the quick follow up release. I think this was
> very
> >> important for Table & SQL users.
> >>
> >> On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann 
> >> wrote:
> >> Thanks for being the release manager for the 1.11.1 release, Dian.
> Thanks
> >> a lot to everyone who contributed to this release.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
> >> Thanks Dian for the great work and thanks to everyone who makes this
> >> release possible!
> >>
> >> Best, Hequn
> >>
> >> On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:
> >>
> >> > Congratulations! Thanks Dian for the great work and to be the release
> >> > manager!
> >> >
> >> > Best,
> >> > Jark
> >> >
> >> > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
> >> >
> >> > > Congrats!
> >> > >
> >> > > Thanks Dian Fu for being release manager, and everyone involved!
> >> > >
> >> > > Best,
> >> > > Yangze Guo
> >> > >
> >> > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
> >> > wrote:
> >> > > >
> >> > > > Congratulations! Thanks Dian for the great work!
> >> > > >
> >> > > > Best,
> >> > > > Wei
> >> > > >
> >> > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
> >> > > > >
> >> > > > > Congratulations!
> >> > > > >
> >> > > > > Thanks Dian Fu for the great work as release manager, and thanks
> >> > > everyone involved!
> >> > > > >
> >> > > > > Best
> >> > > > > Leonard Xu
> >> > > > >
> >> > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
> >> > > > >>
> >> > > > >> The Apache Flink community is very happy to announce the
> release
> >> of
> >> > > Apache Flink 1.11.1, which is the first bugfix release for the
> Apache
> >> > Flink
> >> > > 1.11 series.
> >> > > > >>
> >> > > > >> Apache Flink® is an open-source stream processing framework for
> >> > > distributed, high-performing, always-available, and accurate data
> >> > streaming
> >> > > applications.
> >> > > > >>
> >> > > > >> The release is available for download at:
> >> > > > >> https://flink.apache.org/downloads.html
> >> > > > >>
> >> > > > >> Please check out the release blog post for an overview of the
> >> > > improvements for this bugfix release:
> >> > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> >> > > > >>
> >> > > > >> The full release notes are available in Jira:
> >> > > > >>
> >> > >
> >> >
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> >> > > > >>
> >> > > > >> We would like to thank all contributors of the Apache Flink
> >> > community
> >> > > who made this release possible!
> >> > > > >>
> >> > > > >> Regards,
> >> > > > >> Dian
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>
> >> --
> >>
> >> Konstantin Knauf
> >>
> >> https://twitter.com/snntrable
> >>
> >> https://github.com/knaufk
> >>
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: flink1.11 tablefunction

2020-07-22 文章 Benchao Li
现在有一个work around,就是你可以用子查询先把row展开,比如:
select ...
from (
  select data.rule_results as rule_results, ...
) cross join unnest(rule_results) as t(...)

Benchao Li  于2020年7月23日周四 下午12:44写道:

> 我感觉这可能是calcite的bug,CC Danny老师
>
> Dream-底限  于2020年7月22日周三 下午5:46写道:
>
>> hi 、Benchao Li
>> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛
>>
>> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
>> "  data ROW> STRING,path STRING,country_id INT,create_time BIGINT," +
>> "spent_time DECIMAL(10,2),features
>> ROW<`user_ic_no_aku_uid.pdl_cdpd`
>> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," +
>> "`user_ic_no_aku_uid.current_overdue_collection`
>> INT>,rule_results ARRAY> "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" +
>> "  createTime BIGINT,\n" +
>> "  tindex INT\n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka-0.11',\n" +
>> " 'topic' = 'parser_data_test',\n" +
>> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
>> " 'properties.group.id' = 'testGroup',\n" +
>> " 'scan.startup.mode' = 'earliest-offset',\n" +
>> " 'format' = 'json',\n" +
>> " 'json.fail-on-missing-field' = 'false',\n" +
>> " 'json.ignore-parse-errors' = 'true'\n" +
>> ")");
>>
>> Table table = tableEnv.sqlQuery("select
>>
>> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
>> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
>> (rule_id,rule_name,rule_type_name,`result`,in_path)");
>>
>> table.printSchema();
>> tableEnv.toAppendStream(table,
>>
>> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();
>>
>>
>> 异常信息:
>>
>> rg.apache.flink.table.api.ValidationException: SQL validation failed.
>> From line 0, column 0 to line 1, column 139: Column 'data.data' not
>> found in table 'parser_data_test'
>>
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>> at
>> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at
>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>> at
>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> at
>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>> line 0, column 0 to line 1, column 139: Column 'data.data' not found
>> in table 'parser_data_test'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(Native

Re: flink row 类型

2020-07-22 文章 Benchao Li
这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。
你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。

Dream-底限  于2020年7月22日周三 下午7:22写道:

> hi、
> 我这面定义row数据,类型为ROW,可以通过
> row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口
>
> rule_key  转换为rule_key1,rulekey2
> 1
> 2
>


-- 

Best,
Benchao Li


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong
我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。

Jingsong Li  于2020年7月23日周四 上午11:45写道:

> 相同操作我也没有复现。。是可以成功执行的
>
> 你的HDFS是什么版本?是否可以考虑换个来测试下
>
> On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang 
> wrote:
>
>> hi,jinsong:
>>
>> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>>
>> Jun Zhang  于2020年7月23日周四 上午11:15写道:
>>
>>> hi,夏帅:
>>>
>>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>>
>>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>>
>>> 夏帅  于2020年7月10日周五 下午5:39写道:
>>>
 你好,
 我这边同样的代码,并没有出现类似的问题
 是本地跑么,可以提供下日志信息么?


>
> --
> Best, Jingsong Lee
>


Re: flink1.11 tablefunction

2020-07-22 文章 Benchao Li
我感觉这可能是calcite的bug,CC Danny老师

Dream-底限  于2020年7月22日周三 下午5:46写道:

> hi 、Benchao Li
> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛
>
> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
> "  data ROW STRING,path STRING,country_id INT,create_time BIGINT," +
> "spent_time DECIMAL(10,2),features
> ROW<`user_ic_no_aku_uid.pdl_cdpd`
> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," +
> "`user_ic_no_aku_uid.current_overdue_collection`
> INT>,rule_results ARRAY "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" +
> "  createTime BIGINT,\n" +
> "  tindex INT\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka-0.11',\n" +
> " 'topic' = 'parser_data_test',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'testGroup',\n" +
> " 'scan.startup.mode' = 'earliest-offset',\n" +
> " 'format' = 'json',\n" +
> " 'json.fail-on-missing-field' = 'false',\n" +
> " 'json.ignore-parse-errors' = 'true'\n" +
> ")");
>
> Table table = tableEnv.sqlQuery("select
>
> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
> (rule_id,rule_name,rule_type_name,`result`,in_path)");
>
> table.printSchema();
> tableEnv.toAppendStream(table,
>
> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();
>
>
> 异常信息:
>
> rg.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 0, column 0 to line 1, column 139: Column 'data.data' not
> found in table 'parser_data_test'
>
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> at
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> line 0, column 0 to line 1, column 139: Column 'data.data' not found
> in table 'parser_data_test'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jingsong Li
相同操作我也没有复现。。是可以成功执行的

你的HDFS是什么版本?是否可以考虑换个来测试下

On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang 
wrote:

> hi,jinsong:
>
> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>
> Jun Zhang  于2020年7月23日周四 上午11:15写道:
>
>> hi,夏帅:
>>
>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>
>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>
>> 夏帅  于2020年7月10日周五 下午5:39写道:
>>
>>> 你好,
>>> 我这边同样的代码,并没有出现类似的问题
>>> 是本地跑么,可以提供下日志信息么?
>>>
>>>

-- 
Best, Jingsong Lee


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。

Jun Zhang  于2020年7月23日周四 上午11:34写道:

> hi,jinsong:
>
> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>
> Jun Zhang  于2020年7月23日周四 上午11:15写道:
>
>> hi,夏帅:
>>
>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>
>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>
>> 夏帅  于2020年7月10日周五 下午5:39写道:
>>
>>> 你好,
>>> 我这边同样的代码,并没有出现类似的问题
>>> 是本地跑么,可以提供下日志信息么?
>>>
>>>


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。

Jun Zhang  于2020年7月23日周四 上午11:15写道:

> hi,夏帅:
>
> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>
> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>
> 夏帅  于2020年7月10日周五 下午5:39写道:
>
>> 你好,
>> 我这边同样的代码,并没有出现类似的问题
>> 是本地跑么,可以提供下日志信息么?
>>
>>


Re: Flink 1.11 submit job timed out

2020-07-22 文章 SmileSmile

Hi Yang Wang

刚刚在测试环境测试了一下,taskManager没有办法nslookup出来,JM可以nslookup,这两者的差别在于是否有service。

解决方案:我这边给集群加上了taskmanager-query-state-service.yaml(按照官网上是可选服务)。就不会刷No hostname 
could be resolved for ip address,将NodePort改为ClusterIp,作业就可以成功提交,不会出现time 
out的问题了,问题得到了解决。


1. 如果按照上面的情况,那么这个配置文件是必须配置的?

2. 在1.11的更新中,发现有 [Flink-15911][Flink-15154] 
支持分别配置用于本地监听绑定的网络接口和外部访问的地址和端口。是否是这块的改动,
需要JM去通过TM上报的ip反向解析出service?


Bset!


[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/23/2020 10:11, Yang Wang wrote:
我的意思就是你在Flink任务运行的过程中,然后下面的命令在集群里面起一个busybox的pod,
在里面执行 nslookup {ip_address},看看是否能够正常解析到。如果不能应该就是coredns的
问题了

kubectl run -i -t busybox --image=busybox --restart=Never

你需要确认下集群的coredns pod是否正常,一般是部署在kube-system这个namespace下的



Best,
Yang


SmileSmile  于2020年7月22日周三 下午7:57写道:

>
> Hi,Yang Wang!
>
> 很开心可以收到你的回复,你的回复帮助很大,让我知道了问题的方向。我再补充些信息,希望可以帮我进一步判断一下问题根源。
>
> 在JM报错的地方,No hostname could be resolved for ip address x
> ,报出来的ip是k8s分配给flink pod的内网ip,不是宿主机的ip。请问这个问题可能出在哪里呢
>
> Best!
>
>
> a511955993
> 邮箱:a511955...@163.com
>
> ;
>
> 签名由 网易邮箱大师 ; 定制
>
> On 07/22/2020 18:18, Yang Wang  wrote:
> 如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
> 有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
> 可能是coredns有问题
>
>
> Best,
> Yang
>
> Congxian Qiu  于2020年7月21日周二 下午7:29写道:
>
> > Hi
> >不确定 k8s 环境中能否看到 pod 的完整日志?类似 Yarn 的 NM 日志一样,如果有的话,可以尝试看一下这个 pod
> > 的完整日志有没有什么发现
> > Best,
> > Congxian
> >
> >
> > SmileSmile  于2020年7月21日周二 下午3:19写道:
> >
> > > Hi,Congxian
> > >
> > > 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be
> > > resolved,jm失联,作业提交失败。
> > > 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。
> > >
> > > 在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。
> > >
> > >
> > > 是否有其他排查思路?
> > >
> > > Best!
> > >
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > On 07/16/2020 13:17, Congxian Qiu wrote:
> > > Hi
> > >   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在
> > Yarn
> > > 环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > SmileSmile  于2020年7月15日周三 下午5:20写道:
> > >
> > > > Hi Roc
> > > >
> > > > 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
> > > >
> > > >
> > > >
> > > > | |
> > > > a511955993
> > > > |
> > > > |
> > > > 邮箱:a511955...@163.com
> > > > |
> > > >
> > > > 签名由 网易邮箱大师 定制
> > > >
> > > > On 07/15/2020 17:16, Roc Marshal wrote:
> > > > Hi,SmileSmile.
> > > > 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> > > > 希望这对你有帮助。
> > > >
> > > >
> > > > 祝好。
> > > > Roc Marshal
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> > > > >
> > > > >Hi
> > > > >
> > > > >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> > > > 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP
> address,JM
> > > time
> > > > out,作业提交失败。web ui也会卡主无响应。
> > > > >
> > > > >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> > > > >
> > > > >
> > > > >部分日志如下:
> > > > >
> > > > >2020-07-15 16:58:46,460 WARN
> > > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > > hostname could be resolved for the IP address 10.32.160.7, using IP
> > > address
> > > > as host name. Local input split assignment (such as for HDFS files)
> may
> > > be
> > > > impacted.
> > > > >2020-07-15 16:58:46,460 WARN
> > > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > > hostname could be resolved for the IP address 10.44.224.7, using IP
> > > address
> > > > as host name. Local input split assignment (such as for HDFS files)
> may
> > > be
> > > > impacted.
> > > > >2020-07-15 16:58:46,461 WARN
> > > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > > hostname could be resolved for the IP address 10.40.32.9, using IP
> > > address
> > > > as host name. Local input split assignment (such as for HDFS files)
> may
> > > be
> > > > impacted.
> > > > >
> > > > >2020-07-15 16:59:10,236 INFO
> > > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> [] -
> > > The
> > > > heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a
> timed
> > > out.
> > > > >2020-07-15 16:59:10,236 INFO
> > > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> [] -
> > > > Disconnect job manager 
> > > > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for
> job
> > > > e1554c737e37ed79688a15c746b6e9ef from the resourc

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,夏帅:
抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。

你测试没有问题的情况并行度是 1 吗?写入hdfs?

夏帅  于2020年7月10日周五 下午5:39写道:

> 你好,
> 我这边同样的代码,并没有出现类似的问题
> 是本地跑么,可以提供下日志信息么?
>
>


?????? flinksql1.11????????????????

2020-07-22 文章 ????
HI??





--  --
??: 
   ""   
 <1129656...@qq.com>;
: 2020??7??23??(??) 9:35
??: "user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 
;

> ?? 2020??7??2309:06?? <1129656...@qq.com> ??
> 
> 
> HI??
> 

>>>    
>>> id    type   
>>> 2  err
>>> 1  err
>>> 
>>> 
> 1  err 20200723085754
> 2  err 20200723085755
> 3  err 20200723085756
> 4  err 20200723085757
> 
> 
2s??
>>>    
>>> id    type   
>>> 2  acc
>>> 1  acc
>>> 
> 
> 94 err 20200723084455
> 95 err 20200723084456
> 96 err 20200723084457
> 97 err 20200723084458
> 98 err 20200723084459
> 99 err 20200723084500
> 100err 20200723084501
> 
> 

> 
> 
> 
> 
> ??
> 
> 
>>> 
>>> 
> 
> 
> --  --
> ??: "Leonard Xu" mailto:xbjt...@gmail.com>>; ??
>> 
>> HI,
>> ??
>> 
>> 
>>> ?? 2020??7??2216:27?? <1129656...@qq.com 
>; ??
>>> 
>>> 
>>> HI
>>> 
HELP
>>> 
>>> 
>>> --  --
>>> ??: "" <1129656...@qq.com 
>;;
>>> : 2020??7??22??(??) 3:17
>>> ??: "user-zh"mailto:user-zh@flink.apache.org>>;;
>>> : : Re: flinksql1.11
>>> 
>>> ??
>>> 
1.11.0??TIDB??demo
>>> 
>>> ??kafka
>>>  topic = 'tp1'
>>>     for i  in  range(1,1) :
>>>     
stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
>>>     msg = {}
>>>     msg['id']= i
>>>     msg['time1']= stime
>>>     msg['type']=1
>>>     print(msg)
>>>     send_msg(topic, msg)
>>>     time.sleep(1)
>>> 
>>> {'id': 1, 'time1': '20200722140624', 'type': 1}
>>> {'id': 2, 'time1': '20200722140625', 'type': 1}
>>> {'id': 3, 'time1': '20200722140626', 'type': 1}
>>> {'id': 4, 'time1': '20200722140627', 'type': 1}
>>> {'id': 5, 'time1': '20200722140628', 'type': 1}
>>> {'id': 6, 'time1': '20200722140629', 'type': 1}
>>> {'id': 7, 'time1': '20200722140631', 'type': 1}
>>> {'id': 8, 'time1': '20200722140632', 'type': 1}
>>> 
>>> 
>>> id    type
>>> 2  err
>>> 1  err
>>> 
>>> 
??
>>> 
>>> 
>>> from pyflink.datastream import StreamExecutionEnvironment, 
TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
>>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
>>> from pyflink.table.window import Tumble
>>> 
>>> 
>>> def from_kafka_to_kafka_demo():
>>> 
>>>     # use blink table planner
>>>     env = 
StreamExecutionEnvironment.get_execution_environment()
>>>     
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>     env_settings = 
EnvironmentSettings.Builder().use_blink_planner().build()
>>>     st_env = 
StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
>>> 
>>>     # register source and sink
>>>     register_rides_source(st_env)
>>>     register_rides_sink(st_env)
>>>     register_mysql_source(st_env)
>>>  
>>> 
>>>     st_env.sql_update("insert into flink_result 
select  cast(t1.id ; as int) as id,cast(t2.type as 
varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join 
dim_mysql t2 on t1.type=cast(t2.id ; as varchar) ")
>>>     st_env.execute("2-from_kafka_to_kafka")
>>>    
>>> 
>>> 
>>> def register_rides_source(st_env):
>>>     source_ddl = \
>>>     """
>>>     create table source1(
>>>      id int,
>>>      time1 varchar ,
>>>      type string
>>>      ) with (
>>>     'connector' = 'kafka',
>>>     'topic' = 'tp1',
>>>     'scan.startup.mode' = 'latest-offset',
>>>     'properties.bootstrap.serv

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Rui Li
Thanks Dian for the great work!

On Thu, Jul 23, 2020 at 10:22 AM Jingsong Li  wrote:

> Thanks for being the release manager for the 1.11.1 release, Dian.
>
> Best,
> Jingsong
>
> On Thu, Jul 23, 2020 at 10:12 AM Zhijiang 
> wrote:
>
>> Thanks for being the release manager and the efficient work, Dian!
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Konstantin Knauf 
>> Send Time:2020年7月22日(星期三) 19:55
>> To:Till Rohrmann 
>> Cc:dev ; Yangze Guo ; Dian Fu <
>> dia...@apache.org>; user ; user-zh <
>> user-zh@flink.apache.org>
>> Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released
>>
>> Thank you for managing the quick follow up release. I think this was very
>> important for Table & SQL users.
>>
>> On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann 
>> wrote:
>> Thanks for being the release manager for the 1.11.1 release, Dian. Thanks
>> a lot to everyone who contributed to this release.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
>> Thanks Dian for the great work and thanks to everyone who makes this
>> release possible!
>>
>> Best, Hequn
>>
>> On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:
>>
>> > Congratulations! Thanks Dian for the great work and to be the release
>> > manager!
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
>> >
>> > > Congrats!
>> > >
>> > > Thanks Dian Fu for being release manager, and everyone involved!
>> > >
>> > > Best,
>> > > Yangze Guo
>> > >
>> > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
>> > wrote:
>> > > >
>> > > > Congratulations! Thanks Dian for the great work!
>> > > >
>> > > > Best,
>> > > > Wei
>> > > >
>> > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
>> > > > >
>> > > > > Congratulations!
>> > > > >
>> > > > > Thanks Dian Fu for the great work as release manager, and thanks
>> > > everyone involved!
>> > > > >
>> > > > > Best
>> > > > > Leonard Xu
>> > > > >
>> > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
>> > > > >>
>> > > > >> The Apache Flink community is very happy to announce the release
>> of
>> > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
>> > Flink
>> > > 1.11 series.
>> > > > >>
>> > > > >> Apache Flink® is an open-source stream processing framework for
>> > > distributed, high-performing, always-available, and accurate data
>> > streaming
>> > > applications.
>> > > > >>
>> > > > >> The release is available for download at:
>> > > > >> https://flink.apache.org/downloads.html
>> > > > >>
>> > > > >> Please check out the release blog post for an overview of the
>> > > improvements for this bugfix release:
>> > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
>> > > > >>
>> > > > >> The full release notes are available in Jira:
>> > > > >>
>> > >
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
>> > > > >>
>> > > > >> We would like to thank all contributors of the Apache Flink
>> > community
>> > > who made this release possible!
>> > > > >>
>> > > > >> Regards,
>> > > > >> Dian
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best regards!
Rui Li


Re: flink1.11 sql

2020-07-22 文章 Rui Li
支持的,也是需要配合HiveCatalog一起使用,你在hive那边创建的函数在flink里就能调用了

On Wed, Jul 22, 2020 at 12:14 PM Dream-底限  wrote:

> hi
> flink支持配置hive方言,那么flink可以直接使用hive内自定义的udf、udtf函数吗
>


-- 
Best regards!
Rui Li


Re: flink 1.11 on kubernetes 构建失败

2020-07-22 文章 Yang Wang
这个地方是没有变化的,你可以看TaskManagerRunner的代码,一直是使用ip地址来向JM注册的

你需要确认coredns解析这个IP到底是否可以成功,另外我验证了一下,你说的detail subtasks taskmanagers xxx x 这行
显示的其实目前也是hostname,是解析ip之后得到的,例如我这边看到的是172-20-0-50,是因为我执行nslookup查询的结果是
kubectl run -i -t busybox --image=busybox --restart=Never
/ # nslookup 172.20.0.50
Server: 172.21.0.10
Address: 172.21.0.10:53

50.0.20.172.in-addr.arpa name =
172-20-0-50.flink-jobmanager.default.svc.cluster.local


你最好先确认下你这边K8s集群的变更以及coredns的问题吧

Best,
Yang

SmileSmile  于2020年7月22日周三 下午7:59写道:

> Hi Yang Wang!
>
> 你提到了Flink里面用的是InetAddress#getHostFromNameService来跟进IP地址获取FQDN的。
>
> 这个在1.10和1.11版本是否有发生变化?这段报错只在1.11才出现,1.10不存在。如果core dns有问题,应该两个版本都有有异常
>
> Best!
>
>
> a511955993
> 邮箱:a511955...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> 在2020年07月22日 18:18,Yang Wang  写道:
> 抱歉回复晚了
>
> 我这边也验证了一下,在你所说的地方确实是ip:port,但是提交任务都是正常的
>
> 如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
> 有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
> 可能是coredns有问题
>
>
> Flink里面用的是InetAddress#getHostFromNameService来跟进IP地址获取FQDN的
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月10日周五 下午1:10写道:
>
> > hi Yang
> >
> > 在1.10版本,running的作业点击拓普图中随便一个operation,有detail subtasks taskmanagers xxx
> x
> > 这行,taskmanagers这栏里的host,显示的是 podname:端口
> >
> > 在1.11变成ip:端口
> >
> > 目前我这边遇到的情况是,构建了一个有120slot的集群,作业并行度是120。 提交到jm后jm就失联了,jm
> timeout。观察jm日志,疯狂在刷
> >
> >
> > No hostname could be resolved for the IP address 10.35.160.5, using IP
> > address as host name. Local input split assignment (such as for HDFS
> files)
> > may be impacted
> >
> >
> > 目前观察到的改变主要是这块podname和ip的区别,其他不确定
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月10日 12:13,Yang Wang 写道:
> > 我记得1.11里面对host这个地方应该是没有改动,taskmanager.network.bind-policy的
> > 默认值一会都是ip。所以你说的UI上是podname,这个是哪里的?正常TM列表akka地址
> > 都是ip地址的
> >
> >
> > Best,
> > Yang
> >
> > SmileSmile  于2020年7月10日周五 上午10:42写道:
> >
> > > hi yang wang
> > >
> > > 1.11版本的on kubernetes在hostname上有做什么变化吗?
> > >
> > > 作业运行的时候 flink ui上 tm变成ip:端口
> > > ,在1.10版本,ui上是 podname:端口。
> > >
> > > 作业启动的时候,jm日志一直在刷
> > >
> > > No hostname could be resolved for the IP address 10.35.160.5, using IP
> > > address as host name. Local input split assignment (such as for HDFS
> > files)
> > > may be impacted
> > >
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > 在2020年07月09日 20:02,Yang Wang 写道:
> > > sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改
> > > 才会这样[1]
> > >
> > > 你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。
> > > 如果不是,需要你把你的yaml发出来
> > >
> > >
> > > [1].
> > >
> >
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> > > [2].
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > SmileSmile  于2020年7月9日周四 下午1:40写道:
> > >
> > > > hi
> > > >
> > > > 按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。
> > > >
> > > >
> > > > 目前看差别在于1.11启动jm和tm是通过args:
> > > >
> > >
> >
> ["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed
> > > > 本地挂载的flink-configuration-configmap.yaml导致失败。
> > > >
> > > >
> > > > 1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。
> > > >
> > > > command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
> > > >  while :;
> > > >  do
> > > >if [[ -f $(find log -name '*jobmanager*.log' -print
> -quit)
> > ]];
> > > >  then tail -f -n +1 log/*jobmanager*.log;
> > > >fi;
> > > >  done"]
> > > >
> > > >
> > > > 如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。  1.11 版本的部署方式如果有大佬可以走通的,求分享。
> > > >
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > > [2]
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > > [3]
> > > >
> > >
> >
> https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh
> > > >
> > > >
> > > >
> > > > | |
> > > > a511955993
> > > > |
> > > > |
> > > > 邮箱:a511955...@163.com
> > > > |
> > > >
> > > > 签名由 网易邮箱大师 定制
> > > >
> > > > 在2020年07月08日 16:38,SmileSmile 写道:
> > > > hi yun tang!
> > > >
> > > > 没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
> > > > best!
> > > >
> > > >
> > > >
> > > >
> > > > | |
> > > > a511955993
> > > > |
> > > > |
> > > > 邮箱:a511955...@163.com
> > > > |
> > > >
> > > > 签名由 网易邮箱大师 定制
> > > >
> > > > 在2020年07月08日 16:29

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Jingsong Li
Thanks for being the release manager for the 1.11.1 release, Dian.

Best,
Jingsong

On Thu, Jul 23, 2020 at 10:12 AM Zhijiang 
wrote:

> Thanks for being the release manager and the efficient work, Dian!
>
> Best,
> Zhijiang
>
> --
> From:Konstantin Knauf 
> Send Time:2020年7月22日(星期三) 19:55
> To:Till Rohrmann 
> Cc:dev ; Yangze Guo ; Dian Fu <
> dia...@apache.org>; user ; user-zh <
> user-zh@flink.apache.org>
> Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released
>
> Thank you for managing the quick follow up release. I think this was very
> important for Table & SQL users.
>
> On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann 
> wrote:
> Thanks for being the release manager for the 1.11.1 release, Dian. Thanks
> a lot to everyone who contributed to this release.
>
> Cheers,
> Till
>
> On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
> Thanks Dian for the great work and thanks to everyone who makes this
> release possible!
>
> Best, Hequn
>
> On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:
>
> > Congratulations! Thanks Dian for the great work and to be the release
> > manager!
> >
> > Best,
> > Jark
> >
> > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
> >
> > > Congrats!
> > >
> > > Thanks Dian Fu for being release manager, and everyone involved!
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
> > wrote:
> > > >
> > > > Congratulations! Thanks Dian for the great work!
> > > >
> > > > Best,
> > > > Wei
> > > >
> > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
> > > > >
> > > > > Congratulations!
> > > > >
> > > > > Thanks Dian Fu for the great work as release manager, and thanks
> > > everyone involved!
> > > > >
> > > > > Best
> > > > > Leonard Xu
> > > > >
> > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
> > > > >>
> > > > >> The Apache Flink community is very happy to announce the release
> of
> > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
> > Flink
> > > 1.11 series.
> > > > >>
> > > > >> Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > streaming
> > > applications.
> > > > >>
> > > > >> The release is available for download at:
> > > > >> https://flink.apache.org/downloads.html
> > > > >>
> > > > >> Please check out the release blog post for an overview of the
> > > improvements for this bugfix release:
> > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> > > > >>
> > > > >> The full release notes are available in Jira:
> > > > >>
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> > > > >>
> > > > >> We would like to thank all contributors of the Apache Flink
> > community
> > > who made this release possible!
> > > > >>
> > > > >> Regards,
> > > > >> Dian
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>
>
>

-- 
Best, Jingsong Lee


Re: Flink 1.11 submit job timed out

2020-07-22 文章 Yang Wang
我的意思就是你在Flink任务运行的过程中,然后下面的命令在集群里面起一个busybox的pod,
在里面执行 nslookup {ip_address},看看是否能够正常解析到。如果不能应该就是coredns的
问题了

kubectl run -i -t busybox --image=busybox --restart=Never

你需要确认下集群的coredns pod是否正常,一般是部署在kube-system这个namespace下的



Best,
Yang


SmileSmile  于2020年7月22日周三 下午7:57写道:

>
> Hi,Yang Wang!
>
> 很开心可以收到你的回复,你的回复帮助很大,让我知道了问题的方向。我再补充些信息,希望可以帮我进一步判断一下问题根源。
>
> 在JM报错的地方,No hostname could be resolved for ip address x
> ,报出来的ip是k8s分配给flink pod的内网ip,不是宿主机的ip。请问这个问题可能出在哪里呢
>
> Best!
>
>
> a511955993
> 邮箱:a511955...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> On 07/22/2020 18:18, Yang Wang  wrote:
> 如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
> 有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
> 可能是coredns有问题
>
>
> Best,
> Yang
>
> Congxian Qiu  于2020年7月21日周二 下午7:29写道:
>
> > Hi
> >不确定 k8s 环境中能否看到 pod 的完整日志?类似 Yarn 的 NM 日志一样,如果有的话,可以尝试看一下这个 pod
> > 的完整日志有没有什么发现
> > Best,
> > Congxian
> >
> >
> > SmileSmile  于2020年7月21日周二 下午3:19写道:
> >
> > > Hi,Congxian
> > >
> > > 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be
> > > resolved,jm失联,作业提交失败。
> > > 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。
> > >
> > > 在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。
> > >
> > >
> > > 是否有其他排查思路?
> > >
> > > Best!
> > >
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > On 07/16/2020 13:17, Congxian Qiu wrote:
> > > Hi
> > >   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在
> > Yarn
> > > 环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > SmileSmile  于2020年7月15日周三 下午5:20写道:
> > >
> > > > Hi Roc
> > > >
> > > > 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
> > > >
> > > >
> > > >
> > > > | |
> > > > a511955993
> > > > |
> > > > |
> > > > 邮箱:a511955...@163.com
> > > > |
> > > >
> > > > 签名由 网易邮箱大师 定制
> > > >
> > > > On 07/15/2020 17:16, Roc Marshal wrote:
> > > > Hi,SmileSmile.
> > > > 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> > > > 希望这对你有帮助。
> > > >
> > > >
> > > > 祝好。
> > > > Roc Marshal
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> > > > >
> > > > >Hi
> > > > >
> > > > >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> > > > 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP
> address,JM
> > > time
> > > > out,作业提交失败。web ui也会卡主无响应。
> > > > >
> > > > >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> > > > >
> > > > >
> > > > >部分日志如下:
> > > > >
> > > > >2020-07-15 16:58:46,460 WARN
> > > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > > hostname could be resolved for the IP address 10.32.160.7, using IP
> > > address
> > > > as host name. Local input split assignment (such as for HDFS files)
> may
> > > be
> > > > impacted.
> > > > >2020-07-15 16:58:46,460 WARN
> > > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > > hostname could be resolved for the IP address 10.44.224.7, using IP
> > > address
> > > > as host name. Local input split assignment (such as for HDFS files)
> may
> > > be
> > > > impacted.
> > > > >2020-07-15 16:58:46,461 WARN
> > > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > > hostname could be resolved for the IP address 10.40.32.9, using IP
> > > address
> > > > as host name. Local input split assignment (such as for HDFS files)
> may
> > > be
> > > > impacted.
> > > > >
> > > > >2020-07-15 16:59:10,236 INFO
> > > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> [] -
> > > The
> > > > heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a
> timed
> > > out.
> > > > >2020-07-15 16:59:10,236 INFO
> > > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> [] -
> > > > Disconnect job manager 
> > > > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for
> job
> > > > e1554c737e37ed79688a15c746b6e9ef from the resource manager.
> > > > >
> > > > >
> > > > >how to deal with ?
> > > > >
> > > > >
> > > > >beset !
> > > > >
> > > > >| |
> > > > >a511955993
> > > > >|
> > > > >|
> > > > >邮箱:a511955...@163.com
> > > > >|
> > > > >
> > > > >签名由 网易邮箱大师 定制
> > > >
> > >
> >
>
>


Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Zhijiang
Thanks for being the release manager and the efficient work, Dian!

Best,
Zhijiang


--
From:Konstantin Knauf 
Send Time:2020年7月22日(星期三) 19:55
To:Till Rohrmann 
Cc:dev ; Yangze Guo ; Dian Fu 
; user ; user-zh 

Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released

Thank you for managing the quick follow up release. I think this was very 
important for Table & SQL users.
On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann  wrote:

Thanks for being the release manager for the 1.11.1 release, Dian. Thanks a lot 
to everyone who contributed to this release.

Cheers,
Till
On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
Thanks Dian for the great work and thanks to everyone who makes this
 release possible!

 Best, Hequn

 On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:

 > Congratulations! Thanks Dian for the great work and to be the release
 > manager!
 >
 > Best,
 > Jark
 >
 > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
 >
 > > Congrats!
 > >
 > > Thanks Dian Fu for being release manager, and everyone involved!
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
 > wrote:
 > > >
 > > > Congratulations! Thanks Dian for the great work!
 > > >
 > > > Best,
 > > > Wei
 > > >
 > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
 > > > >
 > > > > Congratulations!
 > > > >
 > > > > Thanks Dian Fu for the great work as release manager, and thanks
 > > everyone involved!
 > > > >
 > > > > Best
 > > > > Leonard Xu
 > > > >
 > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
 > > > >>
 > > > >> The Apache Flink community is very happy to announce the release of
 > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
 > Flink
 > > 1.11 series.
 > > > >>
 > > > >> Apache Flink(r) is an open-source stream processing framework for
 > > distributed, high-performing, always-available, and accurate data
 > streaming
 > > applications.
 > > > >>
 > > > >> The release is available for download at:
 > > > >> https://flink.apache.org/downloads.html
 > > > >>
 > > > >> Please check out the release blog post for an overview of the
 > > improvements for this bugfix release:
 > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
 > > > >>
 > > > >> The full release notes are available in Jira:
 > > > >>
 > >
 > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
 > > > >>
 > > > >> We would like to thank all contributors of the Apache Flink
 > community
 > > who made this release possible!
 > > > >>
 > > > >> Regards,
 > > > >> Dian
 > > > >
 > > >
 > >
 >


-- 
Konstantin Knauf 
https://twitter.com/snntrable
https://github.com/knaufk 



Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-22 文章 Leonard Xu
Hi, Asahi

这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复


Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18665 


> 在 2020年7月23日,00:07,Asahi Lee <978466...@qq.com> 写道:
> 
> 1. 程序
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
> 
> 
>         String sourceTableDDL = "CREATE TABLE fs_table (" 
> +
>                 "  user_id 
> STRING," +
>                 "  order_amount 
> DOUBLE," +
>                 "  dt 
> TIMESTAMP(3)," +
>                 "  pt AS 
> PROCTIME() " +
>                 " ) WITH (" +
>                 "  
> 'connector'='filesystem'," +
>                 "  
> 'path'='D:\\Program 
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
>  +
>                 "  
> 'format'='csv'" +
>                 " )";
> 
> 
>         bsTableEnv.executeSql(sourceTableDDL);
>         bsTableEnv.executeSql("select * from 
> fs_table").print();
> 2. csv文件
> order.csv
> zhangsan,12.34,2020-08-03 12:23:50
> lisi,234.67,2020-08-03 12:25:50
> wangwu,57.6,2020-08-03 12:25:50
> zhaoliu,345,2020-08-03 12:28:50
> 
> 
> 
> 3. 错误
>  - Source: FileSystemTableSource(user_id, order_amount, dt, pt) -> 
> Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) 
> -> SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched 
> from RUNNING to FAILED.
> java.io.IOException: Failed to deserialize CSV row.
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected 
> but was 3.
>   at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
>   at 
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
>   at 
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
>   ... 5 more



?????? flinksql1.11????????????????

2020-07-22 文章 ????
??




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 
;

> ?? 2020??7??2309:06?? <1129656...@qq.com> ??
> 
> 
> HI??
> 

>>>    
>>> id    type   
>>> 2  err
>>> 1  err
>>> 
>>> 
> 1  err 20200723085754
> 2  err 20200723085755
> 3  err 20200723085756
> 4  err 20200723085757
> 
> 
2s??
>>>    
>>> id    type   
>>> 2  acc
>>> 1  acc
>>> 
> 
> 94 err 20200723084455
> 95 err 20200723084456
> 96 err 20200723084457
> 97 err 20200723084458
> 98 err 20200723084459
> 99 err 20200723084500
> 100err 20200723084501
> 
> 

> 
> 
> 
> 
> ??
> 
> 
>>> 
>>> 
> 
> 
> --  --
> ??: "Leonard Xu" mailto:xbjt...@gmail.com>>; ??
>> 
>> HI,
>> ??
>> 
>> 
>>> ?? 2020??7??2216:27?? <1129656...@qq.com 
>; ??
>>> 
>>> 
>>> HI
>>> 
HELP
>>> 
>>> 
>>> --  --
>>> ??: "" <1129656...@qq.com 
>;;
>>> : 2020??7??22??(??) 3:17
>>> ??: "user-zh"mailto:user-zh@flink.apache.org>>;;
>>> : : Re: flinksql1.11
>>> 
>>> ??
>>> 
1.11.0??TIDB??demo
>>> 
>>> ??kafka
>>>  topic = 'tp1'
>>> for i  in  range(1,1) :
>>> 
stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
>>> msg = {}
>>> msg['id']= i
>>> msg['time1']= stime
>>> msg['type']=1
>>> print(msg)
>>> send_msg(topic, 
msg)
>>> time.sleep(1)
>>> 
>>> {'id': 1, 'time1': '20200722140624', 'type': 1}
>>> {'id': 2, 'time1': '20200722140625', 'type': 1}
>>> {'id': 3, 'time1': '20200722140626', 'type': 1}
>>> {'id': 4, 'time1': '20200722140627', 'type': 1}
>>> {'id': 5, 'time1': '20200722140628', 'type': 1}
>>> {'id': 6, 'time1': '20200722140629', 'type': 1}
>>> {'id': 7, 'time1': '20200722140631', 'type': 1}
>>> {'id': 8, 'time1': '20200722140632', 'type': 1}
>>> 
>>> 
>>> id    type
>>> 2  err
>>> 1  err
>>> 
>>> 
??
>>> 
>>> 
>>> from pyflink.datastream import StreamExecutionEnvironment, 
TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
>>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
>>> from pyflink.table.window import Tumble
>>> 
>>> 
>>> def from_kafka_to_kafka_demo():
>>> 
>>> # use blink table planner
>>> env = 
StreamExecutionEnvironment.get_execution_environment()
>>> 
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> env_settings = 
EnvironmentSettings.Builder().use_blink_planner().build()
>>> st_env = 
StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
>>> 
>>> # register source and sink
>>> register_rides_source(st_env)
>>> register_rides_sink(st_env)
>>> register_mysql_source(st_env)
>>>  
>>> 
>>> st_env.sql_update("insert into 
flink_result select  cast(t1.id ; as int) as 
id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 
t1 left join dim_mysql t2 on t1.type=cast(t2.id ; as varchar) 
")
>>> st_env.execute("2-from_kafka_to_kafka")
>>>    
>>> 
>>> 
>>> def register_rides_source(st_env):
>>> source_ddl = \
>>> """
>>> create table source1(
>>>  id int,
>>>  time1 varchar ,
>>>  type string
>>>  ) with (
>>> 'connector' = 'kafka',
>>> 'topic' = 'tp1',
>>> 'scan.startup.mode' = 'latest-offset',
>>> 'properties.bootstrap.servers' = 
'localhost:9092',
>>> 'format' = 'json'
>>>  )
>>> """
>>> st_en

Re: flinksql1.11中主键声明的问题

2020-07-22 文章 Leonard Xu
Hi,

看了下query,你没有使用维表join语法 FOR SYSTEM_TIME AS OF ,这样直接做的regular 
join,mysql表是bounded的,第一次读完就不会再读了,所以不会更新。
维表join才会按照你设置的时间去look up 最新的数据,维表是我们常说的temporal table(时态表)的一种,参考[1] 中的 temporal 
table join


祝好
Leonard Xu
[1]  
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 


> 在 2020年7月23日,09:06,琴师 <1129656...@qq.com> 写道:
> 
> 
> HI:
> 我是使用场景是这样的,首先开启计算脚本,开启流输入,此时的数据中维表数据如下,此时输出的计算结果如下
>>> 维表   
>>> idtype   
>>> 2   err
>>> 1   err
>>> 结果
>>> 
> 1 err 20200723085754
> 2 err 20200723085755
> 3 err 20200723085756
> 4 err 20200723085757
> 
> 然后我更新了数据库维表数据,新的数据库维表数据如下,此时输出的结果并没有随着维表的改变而改变,时间已经超过了缓存刷新时间2s:
>>> 维表   
>>> idtype   
>>> 2   acc
>>> 1   acc
>>> 结果
> 
> 94err 20200723084455
> 95err 20200723084456
> 96err 20200723084457
> 97err 20200723084458
> 98err 20200723084459
> 99err 20200723084500
> 100   err 20200723084501
> 
> 然后我断开了流输入,间隔时间大于缓存刷新时间,然后重新输入流,但是我的新输出结果仅仅是更新了与流有关的时间字段,与维表相关的字段仍没有得到更新。
> 
> 请问我的使用过程哪里不对么,请帮我指出来,万分感谢!
> 
> 
> 谢谢!
> 
> 
>>> 
>>> 
> 
> 
> -- 原始邮件 --
> 发件人: "Leonard Xu" ;
> 发送时间: 2020年7月22日(星期三) 晚上9:39
> 收件人: "琴师"<1129656...@qq.com>;
> 主题: Re: flinksql1.11中主键声明的问题
> 
> <018a8...@6d818d22.7ee2185f.jpg>
> 
> 代码应该没问题的,我源码和本地都复现了下,你检查下你使用方式
> 
> 祝好
> 
>> 在 2020年7月22日,16:39,Leonard Xu mailto:xbjt...@gmail.com>> 
>> 写道:
>> 
>> HI,
>> 我看了维表这块的代码,应该没啥问题的,晚点我本地环境复现确认下哈。
>> 
>> 
>>> 在 2020年7月22日,16:27,琴师 <1129656...@qq.com > 写道:
>>> 
>>> 
>>> HI
>>> 我附录了我的代码,现在基本上测通了流程,卡在维表刷新这里,不能刷新的话很受打击。HELP!!
>>> 谢谢
>>> 
>>> -- 原始邮件 --
>>> 发件人: "琴师" <1129656...@qq.com >;
>>> 发送时间: 2020年7月22日(星期三) 下午3:17
>>> 收件人: "user-zh"mailto:user-zh@flink.apache.org>>;
>>> 主题: 回复: Re: flinksql1.11中主键声明的问题
>>> 
>>> 你好:
>>> 下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。
>>> 
>>> 我的输入流如下,每秒新增一条写入到kafka
>>>  topic = 'tp1'
>>> for i  in  range(1,1) :
>>> stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
>>> msg = {}
>>> msg['id']= i
>>> msg['time1']= stime
>>> msg['type']=1
>>> print(msg)
>>> send_msg(topic, msg)
>>> time.sleep(1)
>>> 
>>> {'id': 1, 'time1': '20200722140624', 'type': 1}
>>> {'id': 2, 'time1': '20200722140625', 'type': 1}
>>> {'id': 3, 'time1': '20200722140626', 'type': 1}
>>> {'id': 4, 'time1': '20200722140627', 'type': 1}
>>> {'id': 5, 'time1': '20200722140628', 'type': 1}
>>> {'id': 6, 'time1': '20200722140629', 'type': 1}
>>> {'id': 7, 'time1': '20200722140631', 'type': 1}
>>> {'id': 8, 'time1': '20200722140632', 'type': 1}
>>> 
>>> 维表数据如下
>>> idtype
>>> 2   err
>>> 1   err
>>> 
>>> 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据
>>> 
>>> 
>>> from pyflink.datastream import StreamExecutionEnvironment, 
>>> TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, DataTypes, 
>>> EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
>>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
>>> from pyflink.table.window import Tumble
>>> 
>>> 
>>> def from_kafka_to_kafka_demo():
>>> 
>>> # use blink table planner
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
>>> st_env = 
>>> StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
>>> 
>>> # register source and sink
>>> register_rides_source(st_env)
>>> register_rides_sink(st_env)
>>> register_mysql_source(st_env)
>>>  
>>> 
>>> st_env.sql_update("insert into flink_result select  cast(t1.id 
>>>  as int) as id,cast(t2.type as varchar),cast( t1.time1 as 
>>> bigint) as rowtime from source1 t1 left join dim_mysql t2 on 
>>> t1.type=cast(t2.id  as varchar) ")
>>> st_env.execute("2-from_kafka_to_kafka")
>>>
>>> 
>>> 
>>> def register_rides_source(st_env):
>>> source_ddl = \
>>> """
>>> create table source1(
>>>  id int,
>>>  time1 varchar ,
>>>  type string
>>>  ) with (
>>> 'connector' = 'kafka',
>>> 'topic' = 'tp1',
>>> 'scan.startup.mode' = 'latest-offset',
>>> 'properties.bootstrap.servers' = 'localhost:9092',
>>> 'format' = 'json'
>>>  )
>>> """
>>> st_env.sql_update(source_ddl)
>>> 
>>> def register_mysql_source(st_env):
>>> source_ddl = \
>>> """
>>> CREATE TABLE dim_mysql (
>>> id int,  --
>>> type varchar --
>>> ) WITH (
>>> 'connector' = 'jdbc',
>>> 'url' = 'jdbc:mysql://localhost:3390/test' <>,
>>> 'table-name' = 'flink_test',
>>> 'driver' = 'com.mysql.cj

Re: Re: flink 1.11 cdc相关问题

2020-07-22 文章 amen...@163.com
感谢二位大佬@Leonard, @Jark的解答!



amen...@163.com
 
发件人: Jark Wu
发送时间: 2020-07-22 23:56
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi,
 
这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
after 字段就不是全的。
这个问题会在后面地版本中解决。
 
Best,
Jark
 
On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:
 
> Hello,
>
> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
> 看起来是你的数据问题,一条 update 的changelog, before 为null,
> 这是不合理的,没有before的数据,是无法处理after的数据的。
> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>
> 祝好
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> >
>
> {
> "payload": {
> "before": null,
> "after": {
> "id": 2,
> "name": "liushimin",
> "age": "24",
> "sex": "man",
> "phone": "1"
> },
> "source": {
> "version": "1.2.0.Final",
> "connector": "postgresql",
> "name": "postgres",
> "ts_ms": 1595409754151,
> "snapshot": "false",
> "db": "postgres",
> "schema": "public",
> "table": "person",
> "txId": 569,
> "lsn": 23632344,
> "xmin": null
> },
> "op": "u",
> "ts_ms": 1595409754270,
> "transaction": null
> }
> }
>
> > 在 2020年7月22日,17:34,amen...@163.com 写道:
> >
> >
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-22 文章 Peihui He
Hi Congxian,

这个问题有结论没呢?

Best wishes.

Peihui He  于2020年7月17日周五 下午4:21写道:

> Hi Congxian,
>
> [image: Snipaste_2020-07-17_16-20-06.png]
>
> 我这边通过chrome 浏览器看到是上传了的,并且可以下载的。
>
> Best wishes.
>
> Congxian Qiu  于2020年7月17日周五 下午1:31写道:
>
>> Hi  Peihui
>>
>> 感谢你的回复,我这边没有看到附件,你那边能否确认下呢?
>>
>> Best,
>> Congxian
>>
>>
>> Peihui He  于2020年7月17日周五 上午10:13写道:
>>
>> > Hi Congxian
>> >
>> > 见附件。
>> >
>> > Best wishes.
>> >
>> > Congxian Qiu  于2020年7月16日周四 下午8:24写道:
>> >
>> >> Hi Peihui
>> >>
>> >> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
>> >> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
>> >> 非常感谢~
>> >>
>> >> [1] https://gist.github.com/
>> >>
>> >> Best,
>> >> Congxian
>> >>
>> >>
>> >> Peihui He  于2020年7月16日周四 下午5:54写道:
>> >>
>> >> > Hi Yun,
>> >> >
>> >> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
>> >> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
>> >> >
>> >> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
>> >> >
>> >> > Peihui He  于2020年7月16日周四 下午5:26写道:
>> >> >
>> >> >> Hi Yun,
>> >> >>
>> >> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>> >> >>
>> >> >> Best wishes.
>> >> >>
>> >> >> Yun Tang  于2020年7月16日周四 下午5:04写道:
>> >> >>
>> >> >>> Hi Peihui
>> >> >>>
>> >> >>> Flink-1.10.1
>> >> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>> >> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>> >> >>>
>> >> >>>
>> >> >>> [1]
>> >> >>>
>> >>
>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>> >> >>> 祝好
>> >> >>> 唐云
>> >> >>> 
>> >> >>> From: Peihui He 
>> >> >>> Sent: Thursday, July 16, 2020 16:15
>> >> >>> To: user-zh@flink.apache.org 
>> >> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >> >>>
>> >> >>> Hi Yun,
>> >> >>>
>> >> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>> >> >>> 输入的特定的word抛出runtimeexception 使task
>> >> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>> >> >>>
>> >> >>> Caused by: java.nio.file.NoSuchFileException:
>> >> >>>
>> >> >>>
>> >>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> >> >>> ->
>> >> >>>
>> >>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>> >> >>>
>> >> >>> 情况和@chenxyz 类似。
>> >> >>>
>> >> >>>
>> >>
>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>> >> >>>
>> >> >>> 换成1.10.1 就可以了
>> >> >>>
>> >> >>> Best wishes.
>> >> >>>
>> >> >>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>> >> >>>
>> >> >>> > Hi Robin
>> >> >>> >
>> >> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>> >> >>> >
>> >> >>>
>> >>
>> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>> >> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>> >> >>> >
>> >> >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
>> >> >>> cause,还请在日志中找一下无法恢复的root
>> >> >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>> >> >>> >
>> >> >>> >
>> >> >>> > [1]
>> >> >>> >
>> >> >>>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>> >> >>> > [2]
>> >> >>> >
>> >> >>>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> >> >>> >
>> >> >>> > 祝好
>> >> >>> > 唐云
>> >> >>> >
>> >> >>> >
>> >> >>> > 
>> >> >>> > From: Robin Zhang 
>> >> >>> > Sent: Wednesday, July 15, 2020 16:23
>> >> >>> > To: user-zh@flink.apache.org 
>> >> >>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >> >>> >
>> >> >>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>> >> >>> >
>> >> >>> > Best
>> >> >>> > Robin Zhang
>> >> >>> > 
>> >> >>> > From: Peihui He <[hidden email]>
>> >> >>> > Sent: Tuesday, July 14, 2020 10:42
>> >> >>> > To: [hidden email] <[hidden email]>
>> >> >>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >> >>> >
>> >> >>> > hello,
>> >> >>> >
>> >> >>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>> >> >>> >
>> >> >>> >
>> >> >>> > Caused by: java.nio.file.NoSuchFileException:
>> >> >>> >
>> >> >>> >
>> >> >>>
>> >>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> >> >>> > ->
>> >> >>> >
>> >> >>> >
>> >> >>>
>> >>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application

flink sink kafka Error while confirming checkpoint

2020-07-22 文章 Peihui He
Hello,

flink 1.10.1
kafka 2.12-1.1.0

运行一段时间后会出现一下错误,不知道有遇到过没?
java.lang.RuntimeException: Error while confirming checkpoint
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:935)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:907)
at
org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:125)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of
transactions failed, logging first encountered failure
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:302)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointComplete$8(StreamTask.java:919)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:101)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:913)
... 12 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been
expired by the broker.

Best wishes.


回复:(无主题)

2020-07-22 文章 罗显宴
感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的
best
shizk233


| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月21日 15:04,罗显宴 写道:
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
val result = num.timeWindowAll(Time.seconds(20))
//.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
.process(new 
ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {

private var itemState: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getMapState(new 
MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
 }

override def process(context: Context, elements: Iterable[IncreaseNumPerHour], 
out: Collector[IncreasePerHour]): Unit = {
var timestamp:Long = 0L
elements.foreach(kv => {
itemState.put(kv.category, 1)
timestamp = (kv.timestamp/2000+1)*2000
})
import scala.collection.JavaConversions._
   out.collect(IncreasePerHour(new Timestamp( timestamp - 1 
).toString,itemState.keys().size))
 }
   })


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 14:15,罗显宴<15927482...@163.com> 写道:


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制




flink 1.11 ddl sql ????PROCTIME()????????csv????

2020-07-22 文章 Asahi Lee
1. 
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);


        String sourceTableDDL = "CREATE TABLE fs_table (" +
                "  user_id 
STRING," +
                "  order_amount 
DOUBLE," +
                "  dt 
TIMESTAMP(3)," +
                "  pt AS 
PROCTIME() " +
                " ) WITH (" +
                "  
'connector'='filesystem'," +
                "  
'path'='D:\\Program 
Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
 +
                "  'format'='csv'" 
+
                " )";


        bsTableEnv.executeSql(sourceTableDDL);
        bsTableEnv.executeSql("select * from 
fs_table").print();
2. csv
order.csv
zhangsan,12.34,2020-08-03 12:23:50
lisi,234.67,2020-08-03 12:25:50
wangwu,57.6,2020-08-03 12:25:50
zhaoliu,345,2020-08-03 12:28:50



3. 
 - Source: FileSystemTableSource(user_id, order_amount, dt, pt) -> 
Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -> 
SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from 
RUNNING to FAILED.
java.io.IOException: Failed to deserialize CSV row.
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected 
but was 3.
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
... 5 more

Re: flink 1.11 cdc相关问题

2020-07-22 文章 Jark Wu
Hi,

这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
after 字段就不是全的。
这个问题会在后面地版本中解决。

Best,
Jark

On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:

> Hello,
>
> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
> 看起来是你的数据问题,一条 update 的changelog, before 为null,
> 这是不合理的,没有before的数据,是无法处理after的数据的。
> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>
> 祝好
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> >
>
> {
> "payload": {
> "before": null,
> "after": {
> "id": 2,
> "name": "liushimin",
> "age": "24",
> "sex": "man",
> "phone": "1"
> },
> "source": {
> "version": "1.2.0.Final",
> "connector": "postgresql",
> "name": "postgres",
> "ts_ms": 1595409754151,
> "snapshot": "false",
> "db": "postgres",
> "schema": "public",
> "table": "person",
> "txId": 569,
> "lsn": 23632344,
> "xmin": null
> },
> "op": "u",
> "ts_ms": 1595409754270,
> "transaction": null
> }
> }
>
> > 在 2020年7月22日,17:34,amen...@163.com 写道:
> >
> >
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>
>


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 Jark Wu
Hi 首维,

我建了一个 issue 来跟进这个问题:https://issues.apache.org/jira/browse/FLINK-18674
我们可以在这个里面继续讨论需求和评估解决方案。

On Wed, 22 Jul 2020 at 18:07, 刘首维  wrote:

> Hi, godfrey
>
>
> 好的,如果可以的话,有了相关讨论的jira或者mail可以cc一下我吗,谢谢啦
>
> 
> 发件人: godfrey he 
> 发送时间: 2020年7月22日 17:49:27
> 收件人: user-zh
> 抄送: Jark Wu; xbjt...@gmail.com; jingsongl...@gmail.com
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> Hi,首维
>
> 感谢给出非常详细的反馈。这个问题我们之前内部也有一些讨论,但由于缺乏一些真实场景,最后维持了当前的接口。
> 我们会根据你提供的场景进行后续讨论。
>
> Best,
> Godfrey
>
> 刘首维  于2020年7月22日周三 下午5:23写道:
>
> > Hi, Jark
> >
> >
> >
> >感谢你的建议!
> >
> >我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。
> >
> >先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法
> >
> >```
> >
> >  >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter
> > 用来做缓冲池/微批/数据过滤等功能
> > 这个我觉得也可以封装在 SinkFunction 里面。
> >
> >   ```
> >
> >
> 比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
> > 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的
> >
> > 考虑到Flink Task都可以拆分成Source -> Transformation -> sink
> > 三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~
> >
> >
> 诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink
> > API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,
> >
> > 可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地
> >
> >
> > 再次感谢各位的回复!
> >
> > 
> > 发件人: Jark Wu 
> > 发送时间: 2020年7月22日 16:33:45
> > 收件人: user-zh
> > 抄送: godfrey he; greemqq...@163.com; 刘首维
> > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> >
> > Hi,首维,
> >
> > 非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
> > 因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。
> >
> > 关于你的一些需求,下面是我的建议和回复:
> >
> > >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> >
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> > 这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema
> > 支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。
> >
> > >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> > 这个我觉得也可以封装在 SinkFunction 里面。
> >
> > >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> > 这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
> >
> > >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> > 这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by
> > partition。我感觉这个可能也可以通过引入类似的接口解决。
> >
> > Best,
> > Jark
> >
> > On Wed, 22 Jul 2020 at 16:27, Leonard Xu  > xbjt...@gmail.com>> wrote:
> > Hi,首维, Ran
> >
> > 感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净,
> > 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
> > 我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
> >
> > 祝好
> > Leonard Xu
> >
> >
> > > 在 2020年7月22日,13:47,刘首维  > liushou...@autohome.com.cn>> 写道:
> > >
> > > Hi JingSong,
> > >
> > >
> >
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> > SDK
> > >  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> > >
> > >
> > >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> >
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> > >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> > >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> > >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> > >
> > >
> > > 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> > >
> > >
> > > 
> > > 发件人: Jingsong Li mailto:jingsongl...@gmail.com
> >>
> > > 发送时间: 2020年7月22日 13:26:00
> > > 收件人: user-zh
> > > 抄送: imj...@gmail.com
> > > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> > >
> > > 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> > >
> > > Best
> > > Jingsong
> > >
> > > On Wed, Jul 22, 2020 at 12:36 PM 刘首维   > liushou...@autohome.com.cn>> wrote:
> > >
> > >> Hi all,
> > >>
> > >>
> > >>
> > >>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> > >>
> > >>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> > >>
> >
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> > >>
> > >>
> > >>
> > >>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> > >>
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> >
> >
>


Re: flink 1.11 cdc相关问题

2020-07-22 文章 Leonard Xu
Hello,

代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
看起来是你的数据问题,一条 update 的changelog, before 为null, 
这是不合理的,没有before的数据,是无法处理after的数据的。
如果确认是脏数据,可以开启ignore-parse-errors跳过[1]

祝好
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
 


{
"payload": {
"before": null,
"after": {
"id": 2,
"name": "liushimin",
"age": "24",
"sex": "man",
"phone": "1"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "postgres",
"ts_ms": 1595409754151,
"snapshot": "false",
"db": "postgres",
"schema": "public",
"table": "person",
"txId": 569,
"lsn": 23632344,
"xmin": null
},
"op": "u",
"ts_ms": 1595409754270,
"transaction": null
}
}

> 在 2020年7月22日,17:34,amen...@163.com 写道:
> 
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}



Re:Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-22 文章 Michael Ran
1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,2.with properties属性很重要 
,关系我自定义的一些参数设定。3.关于  catalog 这个东西,是不是只有1.11 版本才能从catalog  获取  with 
properties 哦? 1.10 you  有支持吗
在 2020-07-22 18:22:22,"godfrey he"  写道:
>tableEnv 中 可以通过
>tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
>如果要拿到properties,可以通过catalog的接口得到 [1]。
>如果要自定义实现source/sink,可以参考 [2]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html
>
>Best,
>Godfrey
>
>
>
>
>
>Michael Ran  于2020年7月22日周三 下午4:10写道:
>
>> dear all:
>>  我用flink 注册一张表:
>>   CREATE TABLE dim_mysql (
>> id int,  --
>> type varchar --
>> ) WITH (
>> 'connector' = 'jdbc',
>> 'url' = 'jdbc:mysql://localhost:3390/test',
>> 'table-name' = 'flink_test',
>> 'driver' = 'com.mysql.cj.jdbc.Driver',
>> 'username' = '',
>> 'password' = '',
>> 'lookup.cache.max-rows' = '5000',
>> 'lookup.cache.ttl' = '1s',
>> 'lookup.max-retries' = '3'
>> )
>> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
>> 以及属性,map 这种。
>> 我看阿里官方有blink 支持自定义sink:
>> publicabstractclassCustomSinkBaseimplementsSerializable{
>> protectedMap userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
>> protectedSet primaryKeys;// 您定义的主键字段名
>> protectedList headerFields;// 标记为header的字段列表
>> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
>> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑


Re: flink1.11启动问题

2020-07-22 文章 Leonard Xu
Hello,
可以描述下这个问题吗? 如果确认是bug的话可以去jira上开个issue的。

祝好
Leonard Xu

> 在 2020年7月22日,20:46,酷酷的浑蛋  写道:
> 
> 我找到问题了,我觉得我发现了一个bug,很严重,会导致flink持续占资源,一直增加
> 
> 
> 
> 
> 在2020年07月22日 14:08,酷酷的浑蛋 写道:
> 这是我的启动命令:./bin/flink run -m yarn-cluster -p 2 -ys 2 -yqu rt_constant -c 
> com.xx.Main -yjm 1024 -ynm RTC_TEST xx.jar
> 任务到yarn上后就一直在占用core,core数量和内存数量一直在增加
> 
> 
> 
> 
> 在2020年07月22日 12:48,JasonLee<17610775...@163.com> 写道:
> HI
> 你使用的什么模式?启动任务的命令发出来看一下吧
> 
> 
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
> 
> Signature is customized by Netease Mail Master
> 
> 在2020年07月22日 12:44,酷酷的浑蛋 写道:
> 现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?  
> 
> 
> 



回复:flink1.11启动问题

2020-07-22 文章 酷酷的浑蛋
我找到问题了,我觉得我发现了一个bug,很严重,会导致flink持续占资源,一直增加




在2020年07月22日 14:08,酷酷的浑蛋 写道:
这是我的启动命令:./bin/flink run -m yarn-cluster -p 2 -ys 2 -yqu rt_constant -c 
com.xx.Main -yjm 1024 -ynm RTC_TEST xx.jar
任务到yarn上后就一直在占用core,core数量和内存数量一直在增加




在2020年07月22日 12:48,JasonLee<17610775...@163.com> 写道:
HI
你使用的什么模式?启动任务的命令发出来看一下吧


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月22日 12:44,酷酷的浑蛋 写道:
现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?  





回复:flink 1.11 on kubernetes 构建失败

2020-07-22 文章 SmileSmile
Hi Yang Wang!

你提到了Flink里面用的是InetAddress#getHostFromNameService来跟进IP地址获取FQDN的。

这个在1.10和1.11版本是否有发生变化?这段报错只在1.11才出现,1.10不存在。如果core dns有问题,应该两个版本都有有异常

Best!



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月22日 18:18,Yang Wang 写道:
抱歉回复晚了

我这边也验证了一下,在你所说的地方确实是ip:port,但是提交任务都是正常的

如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
可能是coredns有问题


Flink里面用的是InetAddress#getHostFromNameService来跟进IP地址获取FQDN的


Best,
Yang

SmileSmile  于2020年7月10日周五 下午1:10写道:

> hi Yang
>
> 在1.10版本,running的作业点击拓普图中随便一个operation,有detail subtasks taskmanagers xxx x
> 这行,taskmanagers这栏里的host,显示的是 podname:端口
>
> 在1.11变成ip:端口
>
> 目前我这边遇到的情况是,构建了一个有120slot的集群,作业并行度是120。 提交到jm后jm就失联了,jm timeout。观察jm日志,疯狂在刷
>
>
> No hostname could be resolved for the IP address 10.35.160.5, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted
>
>
> 目前观察到的改变主要是这块podname和ip的区别,其他不确定
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月10日 12:13,Yang Wang 写道:
> 我记得1.11里面对host这个地方应该是没有改动,taskmanager.network.bind-policy的
> 默认值一会都是ip。所以你说的UI上是podname,这个是哪里的?正常TM列表akka地址
> 都是ip地址的
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月10日周五 上午10:42写道:
>
> > hi yang wang
> >
> > 1.11版本的on kubernetes在hostname上有做什么变化吗?
> >
> > 作业运行的时候 flink ui上 tm变成ip:端口
> > ,在1.10版本,ui上是 podname:端口。
> >
> > 作业启动的时候,jm日志一直在刷
> >
> > No hostname could be resolved for the IP address 10.35.160.5, using IP
> > address as host name. Local input split assignment (such as for HDFS
> files)
> > may be impacted
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月09日 20:02,Yang Wang 写道:
> > sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改
> > 才会这样[1]
> >
> > 你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。
> > 如果不是,需要你把你的yaml发出来
> >
> >
> > [1].
> >
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> > [2].
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> >
> >
> > Best,
> > Yang
> >
> > SmileSmile  于2020年7月9日周四 下午1:40写道:
> >
> > > hi
> > >
> > > 按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。
> > >
> > >
> > > 目前看差别在于1.11启动jm和tm是通过args:
> > >
> >
> ["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed
> > > 本地挂载的flink-configuration-configmap.yaml导致失败。
> > >
> > >
> > > 1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。
> > >
> > > command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
> > >  while :;
> > >  do
> > >if [[ -f $(find log -name '*jobmanager*.log' -print -quit)
> ]];
> > >  then tail -f -n +1 log/*jobmanager*.log;
> > >fi;
> > >  done"]
> > >
> > >
> > > 如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。  1.11 版本的部署方式如果有大佬可以走通的,求分享。
> > >
> > >
> > >
> > > [1]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > [2]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > [3]
> > >
> >
> https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > 在2020年07月08日 16:38,SmileSmile 写道:
> > > hi yun tang!
> > >
> > > 没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
> > > best!
> > >
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > 在2020年07月08日 16:29,Yun Tang 写道:
> > > Hi
> > >
> > > 你是不是对 /opt/flink/conf
> > > 目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml
> > > 等文件,而这个挂载的目录其实是不可写的。
> > > 直接修改configmap里面的内容,这样挂载时候就会自动更新了。
> > >
> > > 祝好
> > > 唐云
> > > 
> > > From: SmileSmile 
> > > Sent: Wednesday, July 8, 2020 13:03
> > > To: Flink user-zh mailing list 
> > > Subject: flink 1.11 on kubernetes 构建失败
> > >
> > > hi
> > >
> > > 按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错
> > >
> > >
> > > Starting Task Manager
> > > sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only
> > > file system
> > > sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only
> > > file system
> > > /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create
> > > /opt/flink/conf/flink-conf.yaml: Permission denied
> > > sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only
> > > file system
> > > /docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create
> > > /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
> > > [ERROR] The execution result is empty.
> > > [ERROR] Could not get JVM parameters and dynamic configurations
> 

Re: Flink 1.11 submit job timed out

2020-07-22 文章 SmileSmile

Hi,Yang Wang!

很开心可以收到你的回复,你的回复帮助很大,让我知道了问题的方向。我再补充些信息,希望可以帮我进一步判断一下问题根源。

在JM报错的地方,No hostname could be resolved for ip address x ,报出来的ip是k8s分配给flink 
pod的内网ip,不是宿主机的ip。请问这个问题可能出在哪里呢

Best!



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/22/2020 18:18, Yang Wang wrote:
如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
可能是coredns有问题


Best,
Yang

Congxian Qiu  于2020年7月21日周二 下午7:29写道:

> Hi
>不确定 k8s 环境中能否看到 pod 的完整日志?类似 Yarn 的 NM 日志一样,如果有的话,可以尝试看一下这个 pod
> 的完整日志有没有什么发现
> Best,
> Congxian
>
>
> SmileSmile  于2020年7月21日周二 下午3:19写道:
>
> > Hi,Congxian
> >
> > 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be
> > resolved,jm失联,作业提交失败。
> > 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。
> >
> > 在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。
> >
> >
> > 是否有其他排查思路?
> >
> > Best!
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > On 07/16/2020 13:17, Congxian Qiu wrote:
> > Hi
> >   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在
> Yarn
> > 环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。
> >
> > Best,
> > Congxian
> >
> >
> > SmileSmile  于2020年7月15日周三 下午5:20写道:
> >
> > > Hi Roc
> > >
> > > 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > On 07/15/2020 17:16, Roc Marshal wrote:
> > > Hi,SmileSmile.
> > > 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> > > 希望这对你有帮助。
> > >
> > >
> > > 祝好。
> > > Roc Marshal
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> > > >
> > > >Hi
> > > >
> > > >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> > > 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM
> > time
> > > out,作业提交失败。web ui也会卡主无响应。
> > > >
> > > >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> > > >
> > > >
> > > >部分日志如下:
> > > >
> > > >2020-07-15 16:58:46,460 WARN
> > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > hostname could be resolved for the IP address 10.32.160.7, using IP
> > address
> > > as host name. Local input split assignment (such as for HDFS files) may
> > be
> > > impacted.
> > > >2020-07-15 16:58:46,460 WARN
> > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > hostname could be resolved for the IP address 10.44.224.7, using IP
> > address
> > > as host name. Local input split assignment (such as for HDFS files) may
> > be
> > > impacted.
> > > >2020-07-15 16:58:46,461 WARN
> > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > hostname could be resolved for the IP address 10.40.32.9, using IP
> > address
> > > as host name. Local input split assignment (such as for HDFS files) may
> > be
> > > impacted.
> > > >
> > > >2020-07-15 16:59:10,236 INFO
> > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > The
> > > heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed
> > out.
> > > >2020-07-15 16:59:10,236 INFO
> > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > > Disconnect job manager 
> > > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for job
> > > e1554c737e37ed79688a15c746b6e9ef from the resource manager.
> > > >
> > > >
> > > >how to deal with ?
> > > >
> > > >
> > > >beset !
> > > >
> > > >| |
> > > >a511955993
> > > >|
> > > >|
> > > >邮箱:a511955...@163.com
> > > >|
> > > >
> > > >签名由 网易邮箱大师 定制
> > >
> >
>


Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Konstantin Knauf
Thank you for managing the quick follow up release. I think this was very
important for Table & SQL users.

On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann  wrote:

> Thanks for being the release manager for the 1.11.1 release, Dian. Thanks
> a lot to everyone who contributed to this release.
>
> Cheers,
> Till
>
> On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
>
>> Thanks Dian for the great work and thanks to everyone who makes this
>> release possible!
>>
>> Best, Hequn
>>
>> On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:
>>
>> > Congratulations! Thanks Dian for the great work and to be the release
>> > manager!
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
>> >
>> > > Congrats!
>> > >
>> > > Thanks Dian Fu for being release manager, and everyone involved!
>> > >
>> > > Best,
>> > > Yangze Guo
>> > >
>> > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
>> > wrote:
>> > > >
>> > > > Congratulations! Thanks Dian for the great work!
>> > > >
>> > > > Best,
>> > > > Wei
>> > > >
>> > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
>> > > > >
>> > > > > Congratulations!
>> > > > >
>> > > > > Thanks Dian Fu for the great work as release manager, and thanks
>> > > everyone involved!
>> > > > >
>> > > > > Best
>> > > > > Leonard Xu
>> > > > >
>> > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
>> > > > >>
>> > > > >> The Apache Flink community is very happy to announce the release
>> of
>> > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
>> > Flink
>> > > 1.11 series.
>> > > > >>
>> > > > >> Apache Flink® is an open-source stream processing framework for
>> > > distributed, high-performing, always-available, and accurate data
>> > streaming
>> > > applications.
>> > > > >>
>> > > > >> The release is available for download at:
>> > > > >> https://flink.apache.org/downloads.html
>> > > > >>
>> > > > >> Please check out the release blog post for an overview of the
>> > > improvements for this bugfix release:
>> > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
>> > > > >>
>> > > > >> The full release notes are available in Jira:
>> > > > >>
>> > >
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
>> > > > >>
>> > > > >> We would like to thank all contributors of the Apache Flink
>> > community
>> > > who made this release possible!
>> > > > >>
>> > > > >> Regards,
>> > > > >> Dian
>> > > > >
>> > > >
>> > >
>> >
>>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


回复:flink 1.11 on kubernetes 构建失败

2020-07-22 文章 SmileSmile

Hi,Yang Wang!

很开心可以收到你的回复,你的回复帮助很大,让我知道了问题的方向。我再补充些信息,希望可以帮我进一步判断一下问题根源。

在JM报错的地方,No hostname could be resolved for ip address x ,报出来的ip是k8s分配给flink 
pod的内网ip,不是宿主机的ip。请问这个问题是出在哪里呢

Best!



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月22日 18:18,Yang Wang 写道:
抱歉回复晚了

我这边也验证了一下,在你所说的地方确实是ip:port,但是提交任务都是正常的

如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
可能是coredns有问题


Flink里面用的是InetAddress#getHostFromNameService来跟进IP地址获取FQDN的


Best,
Yang

SmileSmile  于2020年7月10日周五 下午1:10写道:

> hi Yang
>
> 在1.10版本,running的作业点击拓普图中随便一个operation,有detail subtasks taskmanagers xxx x
> 这行,taskmanagers这栏里的host,显示的是 podname:端口
>
> 在1.11变成ip:端口
>
> 目前我这边遇到的情况是,构建了一个有120slot的集群,作业并行度是120。 提交到jm后jm就失联了,jm timeout。观察jm日志,疯狂在刷
>
>
> No hostname could be resolved for the IP address 10.35.160.5, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted
>
>
> 目前观察到的改变主要是这块podname和ip的区别,其他不确定
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月10日 12:13,Yang Wang 写道:
> 我记得1.11里面对host这个地方应该是没有改动,taskmanager.network.bind-policy的
> 默认值一会都是ip。所以你说的UI上是podname,这个是哪里的?正常TM列表akka地址
> 都是ip地址的
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月10日周五 上午10:42写道:
>
> > hi yang wang
> >
> > 1.11版本的on kubernetes在hostname上有做什么变化吗?
> >
> > 作业运行的时候 flink ui上 tm变成ip:端口
> > ,在1.10版本,ui上是 podname:端口。
> >
> > 作业启动的时候,jm日志一直在刷
> >
> > No hostname could be resolved for the IP address 10.35.160.5, using IP
> > address as host name. Local input split assignment (such as for HDFS
> files)
> > may be impacted
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月09日 20:02,Yang Wang 写道:
> > sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改
> > 才会这样[1]
> >
> > 你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。
> > 如果不是,需要你把你的yaml发出来
> >
> >
> > [1].
> >
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> > [2].
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> >
> >
> > Best,
> > Yang
> >
> > SmileSmile  于2020年7月9日周四 下午1:40写道:
> >
> > > hi
> > >
> > > 按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。
> > >
> > >
> > > 目前看差别在于1.11启动jm和tm是通过args:
> > >
> >
> ["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed
> > > 本地挂载的flink-configuration-configmap.yaml导致失败。
> > >
> > >
> > > 1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。
> > >
> > > command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
> > >  while :;
> > >  do
> > >if [[ -f $(find log -name '*jobmanager*.log' -print -quit)
> ]];
> > >  then tail -f -n +1 log/*jobmanager*.log;
> > >fi;
> > >  done"]
> > >
> > >
> > > 如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。  1.11 版本的部署方式如果有大佬可以走通的,求分享。
> > >
> > >
> > >
> > > [1]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > [2]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > [3]
> > >
> >
> https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > 在2020年07月08日 16:38,SmileSmile 写道:
> > > hi yun tang!
> > >
> > > 没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
> > > best!
> > >
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > 在2020年07月08日 16:29,Yun Tang 写道:
> > > Hi
> > >
> > > 你是不是对 /opt/flink/conf
> > > 目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml
> > > 等文件,而这个挂载的目录其实是不可写的。
> > > 直接修改configmap里面的内容,这样挂载时候就会自动更新了。
> > >
> > > 祝好
> > > 唐云
> > > 
> > > From: SmileSmile 
> > > Sent: Wednesday, July 8, 2020 13:03
> > > To: Flink user-zh mailing list 
> > > Subject: flink 1.11 on kubernetes 构建失败
> > >
> > > hi
> > >
> > > 按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错
> > >
> > >
> > > Starting Task Manager
> > > sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only
> > > file system
> > > sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only
> > > file system
> > > /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create
> > > /opt/flink/conf/flink-conf.yaml: Permission denied
> > > sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only
> > > file system
> > > /docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create
> > > /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
> > > [ERROR] The execution result is empty.
> > > [ERROR] Could not get JVM pa

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Till Rohrmann
Thanks for being the release manager for the 1.11.1 release, Dian. Thanks a
lot to everyone who contributed to this release.

Cheers,
Till

On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:

> Thanks Dian for the great work and thanks to everyone who makes this
> release possible!
>
> Best, Hequn
>
> On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:
>
> > Congratulations! Thanks Dian for the great work and to be the release
> > manager!
> >
> > Best,
> > Jark
> >
> > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
> >
> > > Congrats!
> > >
> > > Thanks Dian Fu for being release manager, and everyone involved!
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
> > wrote:
> > > >
> > > > Congratulations! Thanks Dian for the great work!
> > > >
> > > > Best,
> > > > Wei
> > > >
> > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
> > > > >
> > > > > Congratulations!
> > > > >
> > > > > Thanks Dian Fu for the great work as release manager, and thanks
> > > everyone involved!
> > > > >
> > > > > Best
> > > > > Leonard Xu
> > > > >
> > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
> > > > >>
> > > > >> The Apache Flink community is very happy to announce the release
> of
> > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
> > Flink
> > > 1.11 series.
> > > > >>
> > > > >> Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > streaming
> > > applications.
> > > > >>
> > > > >> The release is available for download at:
> > > > >> https://flink.apache.org/downloads.html
> > > > >>
> > > > >> Please check out the release blog post for an overview of the
> > > improvements for this bugfix release:
> > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> > > > >>
> > > > >> The full release notes are available in Jira:
> > > > >>
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> > > > >>
> > > > >> We would like to thank all contributors of the Apache Flink
> > community
> > > who made this release possible!
> > > > >>
> > > > >> Regards,
> > > > >> Dian
> > > > >
> > > >
> > >
> >
>


回复:flink-1.11 ddl kafka-to-hive问题

2020-07-22 文章 kcz
谢谢大佬们,公众号有demo了,我去对比一下看看





-- 原始邮件 --
发件人: Jingsong Li https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
> <
> 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
> >
>
> > 在 2020年7月21日,22:57,kcz <573693...@qq.com> 写道:
> >
> > 一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: JasonLee <17610775...@163.com 
>
> > 发送时间: 2020年7月21日 20:39
> > 收件人: user-zh mailto:user-zh@flink.apache.org
> >>
> > 主题: 回复:flink-1.11 ddl kafka-to-hive问题
> >
> >
> >
> > hi
> > hive表是一直没有数据还是过一段时间就有数据了?
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月21日 19:09,kcz 写道:
> > hive-1.2.1
> > chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗?
> > String hiveSql = "CREATE  TABLE  
stream_tmp.fs_table (\n" +
> >        
"  host STRING,\n" +
> >        
"  url STRING," +
> >        
"  public_date STRING" +
> >        ") 
partitioned by (public_date
> string) " +
> >        "stored 
as PARQUET " +
> >        
"TBLPROPERTIES (\n" +
> >        
" 
> 'sink.partition-commit.delay'='0 s',\n" +
> >        
" 
> 'sink.partition-commit.trigger'='partition-time',\n" +
> >        
" 
> 'sink.partition-commit.policy.kind'='metastore,success-file'" +
> >        ")";
> > tableEnv.executeSql(hiveSql);
> >
> >
> > tableEnv.executeSql("INSERT INTO  stream_tmp.fs_table SELECT 
host,
> url, DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");
>
>

-- 
Best, Jingsong Lee

flink row 类型

2020-07-22 文章 Dream-底限
hi、
我这面定义row数据,类型为ROW,可以通过
row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口

rule_key  转换为rule_key1,rulekey2
1
2


Re: flink 1.11 on kubernetes 构建失败

2020-07-22 文章 Yang Wang
抱歉回复晚了

我这边也验证了一下,在你所说的地方确实是ip:port,但是提交任务都是正常的

如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
可能是coredns有问题


Flink里面用的是InetAddress#getHostFromNameService来跟进IP地址获取FQDN的


Best,
Yang

SmileSmile  于2020年7月10日周五 下午1:10写道:

> hi Yang
>
> 在1.10版本,running的作业点击拓普图中随便一个operation,有detail subtasks taskmanagers xxx x
> 这行,taskmanagers这栏里的host,显示的是 podname:端口
>
> 在1.11变成ip:端口
>
> 目前我这边遇到的情况是,构建了一个有120slot的集群,作业并行度是120。 提交到jm后jm就失联了,jm timeout。观察jm日志,疯狂在刷
>
>
> No hostname could be resolved for the IP address 10.35.160.5, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted
>
>
> 目前观察到的改变主要是这块podname和ip的区别,其他不确定
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月10日 12:13,Yang Wang 写道:
> 我记得1.11里面对host这个地方应该是没有改动,taskmanager.network.bind-policy的
> 默认值一会都是ip。所以你说的UI上是podname,这个是哪里的?正常TM列表akka地址
> 都是ip地址的
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月10日周五 上午10:42写道:
>
> > hi yang wang
> >
> > 1.11版本的on kubernetes在hostname上有做什么变化吗?
> >
> > 作业运行的时候 flink ui上 tm变成ip:端口
> > ,在1.10版本,ui上是 podname:端口。
> >
> > 作业启动的时候,jm日志一直在刷
> >
> > No hostname could be resolved for the IP address 10.35.160.5, using IP
> > address as host name. Local input split assignment (such as for HDFS
> files)
> > may be impacted
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月09日 20:02,Yang Wang 写道:
> > sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改
> > 才会这样[1]
> >
> > 你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。
> > 如果不是,需要你把你的yaml发出来
> >
> >
> > [1].
> >
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> > [2].
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> >
> >
> > Best,
> > Yang
> >
> > SmileSmile  于2020年7月9日周四 下午1:40写道:
> >
> > > hi
> > >
> > > 按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。
> > >
> > >
> > > 目前看差别在于1.11启动jm和tm是通过args:
> > >
> >
> ["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed
> > > 本地挂载的flink-configuration-configmap.yaml导致失败。
> > >
> > >
> > > 1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。
> > >
> > > command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
> > >  while :;
> > >  do
> > >if [[ -f $(find log -name '*jobmanager*.log' -print -quit)
> ]];
> > >  then tail -f -n +1 log/*jobmanager*.log;
> > >fi;
> > >  done"]
> > >
> > >
> > > 如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。  1.11 版本的部署方式如果有大佬可以走通的,求分享。
> > >
> > >
> > >
> > > [1]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > [2]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > > [3]
> > >
> >
> https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > 在2020年07月08日 16:38,SmileSmile 写道:
> > > hi yun tang!
> > >
> > > 没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
> > > best!
> > >
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > 在2020年07月08日 16:29,Yun Tang 写道:
> > > Hi
> > >
> > > 你是不是对 /opt/flink/conf
> > > 目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml
> > > 等文件,而这个挂载的目录其实是不可写的。
> > > 直接修改configmap里面的内容,这样挂载时候就会自动更新了。
> > >
> > > 祝好
> > > 唐云
> > > 
> > > From: SmileSmile 
> > > Sent: Wednesday, July 8, 2020 13:03
> > > To: Flink user-zh mailing list 
> > > Subject: flink 1.11 on kubernetes 构建失败
> > >
> > > hi
> > >
> > > 按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错
> > >
> > >
> > > Starting Task Manager
> > > sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only
> > > file system
> > > sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only
> > > file system
> > > /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create
> > > /opt/flink/conf/flink-conf.yaml: Permission denied
> > > sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only
> > > file system
> > > /docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create
> > > /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
> > > [ERROR] The execution result is empty.
> > > [ERROR] Could not get JVM parameters and dynamic configurations
> properly.
> > >
> > >
> > > 是否有遇到同样的问题,支个招
> > >
> > >
> > >
> > > [1]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > >
> > >
> > > | |
> > > a511955993
>

Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-22 文章 godfrey he
tableEnv 中 可以通过
tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
如果要拿到properties,可以通过catalog的接口得到 [1]。
如果要自定义实现source/sink,可以参考 [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html

Best,
Godfrey





Michael Ran  于2020年7月22日周三 下午4:10写道:

> dear all:
>  我用flink 注册一张表:
>   CREATE TABLE dim_mysql (
> id int,  --
> type varchar --
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3390/test',
> 'table-name' = 'flink_test',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = '',
> 'password' = '',
> 'lookup.cache.max-rows' = '5000',
> 'lookup.cache.ttl' = '1s',
> 'lookup.max-retries' = '3'
> )
> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
> 以及属性,map 这种。
> 我看阿里官方有blink 支持自定义sink:
> publicabstractclassCustomSinkBaseimplementsSerializable{
> protectedMap userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
> protectedSet primaryKeys;// 您定义的主键字段名
> protectedList headerFields;// 标记为header的字段列表
> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑


Re: Flink 1.11 submit job timed out

2020-07-22 文章 Yang Wang
如果你的日志里面一直在刷No hostname could be resolved for the IP address,应该是集群的coredns
有问题,由ip地址反查hostname查不到。你可以起一个busybox验证一下是不是这个ip就解析不了,有
可能是coredns有问题


Best,
Yang

Congxian Qiu  于2020年7月21日周二 下午7:29写道:

> Hi
>不确定 k8s 环境中能否看到 pod 的完整日志?类似 Yarn 的 NM 日志一样,如果有的话,可以尝试看一下这个 pod
> 的完整日志有没有什么发现
> Best,
> Congxian
>
>
> SmileSmile  于2020年7月21日周二 下午3:19写道:
>
> > Hi,Congxian
> >
> > 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be
> > resolved,jm失联,作业提交失败。
> > 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。
> >
> > 在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。
> >
> >
> > 是否有其他排查思路?
> >
> > Best!
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > On 07/16/2020 13:17, Congxian Qiu wrote:
> > Hi
> >   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在
> Yarn
> > 环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。
> >
> > Best,
> > Congxian
> >
> >
> > SmileSmile  于2020年7月15日周三 下午5:20写道:
> >
> > > Hi Roc
> > >
> > > 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
> > >
> > >
> > >
> > > | |
> > > a511955993
> > > |
> > > |
> > > 邮箱:a511955...@163.com
> > > |
> > >
> > > 签名由 网易邮箱大师 定制
> > >
> > > On 07/15/2020 17:16, Roc Marshal wrote:
> > > Hi,SmileSmile.
> > > 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> > > 希望这对你有帮助。
> > >
> > >
> > > 祝好。
> > > Roc Marshal
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> > > >
> > > >Hi
> > > >
> > > >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> > > 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM
> > time
> > > out,作业提交失败。web ui也会卡主无响应。
> > > >
> > > >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> > > >
> > > >
> > > >部分日志如下:
> > > >
> > > >2020-07-15 16:58:46,460 WARN
> > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > hostname could be resolved for the IP address 10.32.160.7, using IP
> > address
> > > as host name. Local input split assignment (such as for HDFS files) may
> > be
> > > impacted.
> > > >2020-07-15 16:58:46,460 WARN
> > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > hostname could be resolved for the IP address 10.44.224.7, using IP
> > address
> > > as host name. Local input split assignment (such as for HDFS files) may
> > be
> > > impacted.
> > > >2020-07-15 16:58:46,461 WARN
> > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> > > hostname could be resolved for the IP address 10.40.32.9, using IP
> > address
> > > as host name. Local input split assignment (such as for HDFS files) may
> > be
> > > impacted.
> > > >
> > > >2020-07-15 16:59:10,236 INFO
> > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > The
> > > heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed
> > out.
> > > >2020-07-15 16:59:10,236 INFO
> > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > > Disconnect job manager 
> > > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for job
> > > e1554c737e37ed79688a15c746b6e9ef from the resource manager.
> > > >
> > > >
> > > >how to deal with ?
> > > >
> > > >
> > > >beset !
> > > >
> > > >| |
> > > >a511955993
> > > >|
> > > >|
> > > >邮箱:a511955...@163.com
> > > >|
> > > >
> > > >签名由 网易邮箱大师 定制
> > >
> >
>


答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 刘首维
Hi, godfrey


好的,如果可以的话,有了相关讨论的jira或者mail可以cc一下我吗,谢谢啦


发件人: godfrey he 
发送时间: 2020年7月22日 17:49:27
收件人: user-zh
抄送: Jark Wu; xbjt...@gmail.com; jingsongl...@gmail.com
主题: Re: 关于1.11Flink SQL 全新API设计的一些问题

Hi,首维

感谢给出非常详细的反馈。这个问题我们之前内部也有一些讨论,但由于缺乏一些真实场景,最后维持了当前的接口。
我们会根据你提供的场景进行后续讨论。

Best,
Godfrey

刘首维  于2020年7月22日周三 下午5:23写道:

> Hi, Jark
>
>
>
>感谢你的建议!
>
>我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。
>
>先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法
>
>```
>
>  >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter
> 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
>   ```
>
>  
> 比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
> 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的
>
> 考虑到Flink Task都可以拆分成Source -> Transformation -> sink
> 三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~
>
>诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink
> API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,
>
> 可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地
>
>
> 再次感谢各位的回复!
>
> 
> 发件人: Jark Wu 
> 发送时间: 2020年7月22日 16:33:45
> 收件人: user-zh
> 抄送: godfrey he; greemqq...@163.com; 刘首维
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> Hi,首维,
>
> 非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
> 因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。
>
> 关于你的一些需求,下面是我的建议和回复:
>
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> 这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema
> 支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。
>
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> 这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
>
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> 这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by
> partition。我感觉这个可能也可以通过引入类似的接口解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 16:27, Leonard Xu  xbjt...@gmail.com>> wrote:
> Hi,首维, Ran
>
> 感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净,
> 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
> 我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
>
> 祝好
> Leonard Xu
>
>
> > 在 2020年7月22日,13:47,刘首维  liushou...@autohome.com.cn>> 写道:
> >
> > Hi JingSong,
> >
> >
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
> >  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> >
> >
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> >
> >
> > 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> >
> >
> > 
> > 发件人: Jingsong Li mailto:jingsongl...@gmail.com>>
> > 发送时间: 2020年7月22日 13:26:00
> > 收件人: user-zh
> > 抄送: imj...@gmail.com
> > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> >
> > 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> >
> > Best
> > Jingsong
> >
> > On Wed, Jul 22, 2020 at 12:36 PM 刘首维  liushou...@autohome.com.cn>> wrote:
> >
> >> Hi all,
> >>
> >>
> >>
> >>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >>
> >>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >>
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >>
> >>
> >>
> >>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 godfrey he
Hi,首维

感谢给出非常详细的反馈。这个问题我们之前内部也有一些讨论,但由于缺乏一些真实场景,最后维持了当前的接口。
我们会根据你提供的场景进行后续讨论。

Best,
Godfrey

刘首维  于2020年7月22日周三 下午5:23写道:

> Hi, Jark
>
>
>
>感谢你的建议!
>
>我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。
>
>先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法
>
>```
>
>  >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter
> 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
>   ```
>
>  
> 比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
> 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的
>
> 考虑到Flink Task都可以拆分成Source -> Transformation -> sink
> 三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~
>
>诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink
> API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,
>
> 可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地
>
>
> 再次感谢各位的回复!
>
> 
> 发件人: Jark Wu 
> 发送时间: 2020年7月22日 16:33:45
> 收件人: user-zh
> 抄送: godfrey he; greemqq...@163.com; 刘首维
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> Hi,首维,
>
> 非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
> 因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。
>
> 关于你的一些需求,下面是我的建议和回复:
>
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> 这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema
> 支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。
>
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> 这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
>
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> 这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by
> partition。我感觉这个可能也可以通过引入类似的接口解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 16:27, Leonard Xu  xbjt...@gmail.com>> wrote:
> Hi,首维, Ran
>
> 感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净,
> 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
> 我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
>
> 祝好
> Leonard Xu
>
>
> > 在 2020年7月22日,13:47,刘首维  liushou...@autohome.com.cn>> 写道:
> >
> > Hi JingSong,
> >
> >
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
> >  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> >
> >
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> >
> >
> > 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> >
> >
> > 
> > 发件人: Jingsong Li mailto:jingsongl...@gmail.com>>
> > 发送时间: 2020年7月22日 13:26:00
> > 收件人: user-zh
> > 抄送: imj...@gmail.com
> > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> >
> > 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> >
> > Best
> > Jingsong
> >
> > On Wed, Jul 22, 2020 at 12:36 PM 刘首维  liushou...@autohome.com.cn>> wrote:
> >
> >> Hi all,
> >>
> >>
> >>
> >>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >>
> >>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >>
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >>
> >>
> >>
> >>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>


Re: flink1.11 tablefunction

2020-07-22 文章 Dream-底限
hi 、Benchao Li
我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛

tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
"  data ROW,rule_results ARRAY>>,\n" +
"  createTime BIGINT,\n" +
"  tindex INT\n" +
") WITH (\n" +
" 'connector' = 'kafka-0.11',\n" +
" 'topic' = 'parser_data_test',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");

Table table = tableEnv.sqlQuery("select
data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
(rule_id,rule_name,rule_type_name,`result`,in_path)");

table.printSchema();
tableEnv.toAppendStream(table,
Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();


异常信息:

rg.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 0, column 0 to line 1, column 139: Column 'data.data' not
found in table 'parser_data_test'

at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at 
com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: org.apache.calcite.runtime.CalciteContextException: From
line 0, column 0 to line 1, column 139: Column 'data.data' not found
in table 'parser_data_test'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at 
org.apache.calcite.sql.validate.SqlValidatorI

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Hequn Cheng
Thanks Dian for the great work and thanks to everyone who makes this
release possible!

Best, Hequn

On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:

> Congratulations! Thanks Dian for the great work and to be the release
> manager!
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
>
> > Congrats!
> >
> > Thanks Dian Fu for being release manager, and everyone involved!
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
> wrote:
> > >
> > > Congratulations! Thanks Dian for the great work!
> > >
> > > Best,
> > > Wei
> > >
> > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
> > > >
> > > > Congratulations!
> > > >
> > > > Thanks Dian Fu for the great work as release manager, and thanks
> > everyone involved!
> > > >
> > > > Best
> > > > Leonard Xu
> > > >
> > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
> > > >>
> > > >> The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.11.1, which is the first bugfix release for the Apache
> Flink
> > 1.11 series.
> > > >>
> > > >> Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> > > >>
> > > >> The release is available for download at:
> > > >> https://flink.apache.org/downloads.html
> > > >>
> > > >> Please check out the release blog post for an overview of the
> > improvements for this bugfix release:
> > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> > > >>
> > > >> The full release notes are available in Jira:
> > > >>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> > > >>
> > > >> We would like to thank all contributors of the Apache Flink
> community
> > who made this release possible!
> > > >>
> > > >> Regards,
> > > >> Dian
> > > >
> > >
> >
>


flink 1.11 cdc相关问题

2020-07-22 文章 amen...@163.com
hi everyone,

小白通过debezium将pgsql cdc数据同步至kafka之后,使用我们flink的sql client提交测试任务,但当kafka端cdc 
json数据一开始发送,任务即报错,通过web ui log查看界面发现错误日志如下,还请大佬帮忙分析,谢谢!

分割线==
DDL:

CREATE TABLE pgsql_person_cdc(
id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
) WITH (
'connector' = 'kafka',
'topic' = 'postgres.public.person',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'pgsql_cdc',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
)
CREATE TABLE sql_out (
  id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
) WITH (
'connector' = 'print'
)
INSERT INTO sql_out SELECT * FROM pgsql_person_cdc;

分割线==
错误日志:

java.io.IOException: Corrupt Debezium JSON message 
'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}'.
at 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
 ~[flink-json-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
 ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
 ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
 ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
 ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NullPointerException
at 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:120)
 ~[flink-json-1.11.0.jar:1.11.0]
... 7 more
2020-07-22 17:22:34,415 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for Source: TableSourceScan(table=[[default_catalog, 
default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> 
Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, 
age, sex, phone]) (1/1

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 刘首维
Hi, Jark



   感谢你的建议!

   我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。

   先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法

   ```

 >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
这个我觉得也可以封装在 SinkFunction 里面。

  ```

 
比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的

考虑到Flink Task都可以拆分成Source -> Transformation -> sink 
三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~

   诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink 
API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,

可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地


再次感谢各位的回复!


发件人: Jark Wu 
发送时间: 2020年7月22日 16:33:45
收件人: user-zh
抄送: godfrey he; greemqq...@163.com; 刘首维
主题: Re: 关于1.11Flink SQL 全新API设计的一些问题

Hi,首维,

非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。

关于你的一些需求,下面是我的建议和回复:

>  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema 
支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。

>  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
这个我觉得也可以封装在 SinkFunction 里面。

>  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。

>  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by 
partition。我感觉这个可能也可以通过引入类似的接口解决。

Best,
Jark

On Wed, 22 Jul 2020 at 16:27, Leonard Xu 
mailto:xbjt...@gmail.com>> wrote:
Hi,首维, Ran

感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净, 
但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey

祝好
Leonard Xu


> 在 2020年7月22日,13:47,刘首维 
> mailto:liushou...@autohome.com.cn>> 写道:
>
> Hi JingSong,
>
>  简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL 
> SDK
>  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
>
>
>  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
>  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
>  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
>  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
>
>
> 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
>
>
> 
> 发件人: Jingsong Li mailto:jingsongl...@gmail.com>>
> 发送时间: 2020年7月22日 13:26:00
> 收件人: user-zh
> 抄送: imj...@gmail.com
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
>
> Best
> Jingsong
>
> On Wed, Jul 22, 2020 at 12:36 PM 刘首维 
> mailto:liushou...@autohome.com.cn>> wrote:
>
>> Hi all,
>>
>>
>>
>>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>>
>>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
>> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>>
>>
>>
>>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>>
>
>
> --
> Best, Jingsong Lee



flink 问题排查补充

2020-07-22 文章 steven chen
hi:
这个flink 版本1.10 全是提交sql 运行,生产环境经常出现这种问题,然后节点就死了,任务又只能从checkpoits 恢复,该如何解决?sql  
里mysql 如何释放mysql 这个,求大佬回答?这是生产环境

2020-07-22 11:46:40,085 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 43842 of job a3eae3f691bdea687b9979b9e0ac28e2.

org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 43842 for operator GroupAggregate(groupBy=[item_code], 
select=[item_code, COUNT(ts) AS pv, COUNT(DISTINCT channelOrOfflineId) FILTER 
$f3 AS share_time, COUNT(ts) FILTER $f4 AS ios_pv, COUNT(ts) FILTER $f5 AS 
android_pv, COUNT(ts) FILTER $f6 AS other_pv, COUNT(DISTINCT userId) AS uv, 
COUNT(DISTINCT userId) FILTER $f4 AS ios_uv, COUNT(DISTINCT userId) FILTER $f5 
AS android_uv, COUNT(DISTINCT userId) FILTER $f6 AS other_uv]) -> 
SinkConversionToTuple2 -> Sink: JDBCUpsertTableSink(item_code, pv, share_time, 
ios_pv, android_pv, other_pv, uv, ios_uv, android_uv, other_uv) (1/1). Failure 
reason: Checkpoint was declined.

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:820)

at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)

at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)

at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)

at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)

at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.RuntimeException: Writing records to JDBC failed.

at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.checkFlushException(JDBCUpsertOutputFormat.java:135)

at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:155)

at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)

... 19 more

Caused by: java.lang.RuntimeException: Writing records to JDBC failed.

at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.checkFlushException(JDBCUpsertOutputFormat.java:135)

at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:155)

at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)

at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)





 

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Jark Wu
Congratulations! Thanks Dian for the great work and to be the release
manager!

Best,
Jark

On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:

> Congrats!
>
> Thanks Dian Fu for being release manager, and everyone involved!
>
> Best,
> Yangze Guo
>
> On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong  wrote:
> >
> > Congratulations! Thanks Dian for the great work!
> >
> > Best,
> > Wei
> >
> > > 在 2020年7月22日,15:09,Leonard Xu  写道:
> > >
> > > Congratulations!
> > >
> > > Thanks Dian Fu for the great work as release manager, and thanks
> everyone involved!
> > >
> > > Best
> > > Leonard Xu
> > >
> > >> 在 2020年7月22日,14:52,Dian Fu  写道:
> > >>
> > >> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.11.1, which is the first bugfix release for the Apache Flink
> 1.11 series.
> > >>
> > >> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> > >>
> > >> The release is available for download at:
> > >> https://flink.apache.org/downloads.html
> > >>
> > >> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> > >>
> > >> The full release notes are available in Jira:
> > >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> > >>
> > >> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> > >>
> > >> Regards,
> > >> Dian
> > >
> >
>


Re: 回复:flink1.11启动问题

2020-07-22 文章 chengyanan1...@foxmail.com
看一下yarn-containers-vcores这个参数:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#yarn-containers-vcores

结合自己的集群,适当调低这个参数





chengyanan1...@foxmail.com
 
发件人: JasonLee
发送时间: 2020-07-22 12:58
收件人: user-zh
主题: 回复:flink1.11启动问题
Hi
报错显示的是资源不足了 你确定yarn上的资源是够的吗 看下是不是节点挂了 1.11我这边提交任务都是正常的
 
 
| |
JasonLee
|
|
邮箱:17610775...@163.com
|
 
Signature is customized by Netease Mail Master
 
在2020年07月21日 16:36,酷酷的浑蛋 写道:
 
 
服了啊,这个flink1.11启动怎么净是问题啊
 
 
我1.7,1.8,1.9 都没有问题,到11就不行
./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm 1024 
-ynm sql_test ./examples/batch/WordCount.jar --input 
hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a
 
 
报错:
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. Please make 
sure that the cluster has enough resources. at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
 ... 45 more Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.TimeoutException at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) 
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 ... 25 more
 
 
我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),
 


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 Jark Wu
Hi,首维,

非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。

关于你的一些需求,下面是我的建议和回复:

>  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema
支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。

>  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
这个我觉得也可以封装在 SinkFunction 里面。

>  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。

>  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by
partition。我感觉这个可能也可以通过引入类似的接口解决。

Best,
Jark

On Wed, 22 Jul 2020 at 16:27, Leonard Xu  wrote:

> Hi,首维, Ran
>
> 感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净,
> 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
> 我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
>
> 祝好
> Leonard Xu
>
>
> > 在 2020年7月22日,13:47,刘首维  写道:
> >
> > Hi JingSong,
> >
> >
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
> >  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> >
> >
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> >
> >
> > 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> >
> >
> > 
> > 发件人: Jingsong Li 
> > 发送时间: 2020年7月22日 13:26:00
> > 收件人: user-zh
> > 抄送: imj...@gmail.com
> > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> >
> > 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> >
> > Best
> > Jingsong
> >
> > On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:
> >
> >> Hi all,
> >>
> >>
> >>
> >>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >>
> >>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >>
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >>
> >>
> >>
> >>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 Leonard Xu
Hi,首维, Ran

感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净, 
但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey

祝好
Leonard Xu


> 在 2020年7月22日,13:47,刘首维  写道:
> 
> Hi JingSong,
> 
>  简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL 
> SDK
>  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> 
> 
>  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
>  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
>  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
>  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> 
> 
> 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> 
> 
> 
> 发件人: Jingsong Li 
> 发送时间: 2020年7月22日 13:26:00
> 收件人: user-zh
> 抄送: imj...@gmail.com
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> 
> 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> 
> Best
> Jingsong
> 
> On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:
> 
>> Hi all,
>> 
>> 
>> 
>>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>> 
>>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
>> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>> 
>> 
>> 
>>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>> 
> 
> 
> --
> Best, Jingsong Lee



Re:flink stream如何为每条数据生成自增主键

2020-07-22 文章 Michael Ran
id 生成器吧
在 2020-07-22 15:51:44,"tiantingting5...@163.com"  写道:
>
>flink stream如何为每条数据生成自增主键??时间戳貌似不行,同一时间戳可能会产生多条数据,无法区分数据的现后顺序。
>
>
>tiantingting5...@163.com


Re:答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 Michael Ran
这个需求 我们也比较类似:要获取注册的表信息,自己用stream+table 实现部分逻辑
在 2020-07-22 13:47:25,"刘首维"  写道:
>Hi JingSong,
>
>  简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL 
> SDK
>  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
>
>
>  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
>  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
>  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
>  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
>
>
>如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
>
>
>
>发件人: Jingsong Li 
>发送时间: 2020年7月22日 13:26:00
>收件人: user-zh
>抄送: imj...@gmail.com
>主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
>可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
>
>Best
>Jingsong
>
>On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:
>
>> Hi all,
>>
>>
>>
>> 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>>
>> 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
>> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>>
>>
>>
>> 所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>>
>
>
>--
>Best, Jingsong Lee


flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-22 文章 Michael Ran
dear all:
 我用flink 注册一张表:
  CREATE TABLE dim_mysql (
id int,  -- 
type varchar -- 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3390/test',
'table-name' = 'flink_test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '',
'password' = '',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '1s',
'lookup.max-retries' = '3'
)
有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR] 以及属性,map 这种。
我看阿里官方有blink 支持自定义sink:
publicabstractclassCustomSinkBaseimplementsSerializable{
protectedMap userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
protectedSet primaryKeys;// 您定义的主键字段名
protectedList headerFields;// 标记为header的字段列表
protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑

Flink APP restart policy not working

2020-07-22 文章 Rainie Li
各位大佬好,
本人Flink新手上路,想咨询一下有时候Flink App 设置了restartPolicy 但是还是restart不了,这种情况怎么破?

*Job’s restartPolicy:*

*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
org.apache.flink.api.common.time.Time.seconds(30)));*

*Job Manager log:*

2020-07-15 20:26:27,831 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
switched from state RUNNING to FAILING.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager. This might
indicate that the remote task manager was lost.

at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)

at java.lang.Thread.run(Thread.java:748)


*yarn node manager log:*

2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31]
Reading SavedModel from

2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54]
Reading meta graph with tags

2020-07-15 20:57:11.928923: I
tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports
instructions that this TensorFlow binary was not compiled to use:
SSE4.1 SSE4.2 AVX AVX2 FMA

2020-07-15 20:57:11.935924: I tensorflow/cc/saved_model/loader.cc:162]
Restoring SavedModel bundle.

2020-07-15 20:57:11.939271: I tensorflow/cc/saved_model/loader.cc:138]
Running MainOp with key saved_model_main_op on SavedModel bundle.

2020-07-15 20:57:11.944583: I tensorflow/cc/saved_model/loader.cc:259]
SavedModel load for tags; Status: success. Took 16732 microseconds.

2020-07-15 20:58:13.356004: F
tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot
register 2 metrics with the same name:
/tensorflow/cc/saved_model/load_attempt_count


多谢🙏
Rainie


flink 问题排查

2020-07-22 文章 steven chen
hi:
这个flink 版本1.10 全是提交sql 运行,生产环境经常出现这种问题,然后节点就死了,任务又只能从checkpoits 恢复,该如何解决?sql  
里mysql 如何释放mysql 这个,求大佬回答?这是生产环境

flink stream如何为每条数据生成自增主键

2020-07-22 文章 tiantingting5...@163.com

flink stream如何为每条数据生成自增主键??时间戳貌似不行,同一时间戳可能会产生多条数据,无法区分数据的现后顺序。


tiantingting5...@163.com


Re: flink1.11 web ui????DAG

2020-07-22 文章 ??????


Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Yangze Guo
Congrats!

Thanks Dian Fu for being release manager, and everyone involved!

Best,
Yangze Guo

On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong  wrote:
>
> Congratulations! Thanks Dian for the great work!
>
> Best,
> Wei
>
> > 在 2020年7月22日,15:09,Leonard Xu  写道:
> >
> > Congratulations!
> >
> > Thanks Dian Fu for the great work as release manager, and thanks everyone 
> > involved!
> >
> > Best
> > Leonard Xu
> >
> >> 在 2020年7月22日,14:52,Dian Fu  写道:
> >>
> >> The Apache Flink community is very happy to announce the release of Apache 
> >> Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 
> >> series.
> >>
> >> Apache Flink® is an open-source stream processing framework for 
> >> distributed, high-performing, always-available, and accurate data 
> >> streaming applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the improvements 
> >> for this bugfix release:
> >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> >>
> >> We would like to thank all contributors of the Apache Flink community who 
> >> made this release possible!
> >>
> >> Regards,
> >> Dian
> >
>


回复: Re: flinksql1.11中主键声明的问题

2020-07-22 文章 琴师
你好:
下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。

我的输入流如下,每秒新增一条写入到kafka
 topic = 'tp1'
for i  in  range(1,1) :
stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
msg = {}
msg['id']= i
msg['time1']= stime
msg['type']=1
print(msg)
send_msg(topic, msg)
time.sleep(1)

{'id': 1, 'time1': '20200722140624', 'type': 1}
{'id': 2, 'time1': '20200722140625', 'type': 1}
{'id': 3, 'time1': '20200722140626', 'type': 1}
{'id': 4, 'time1': '20200722140627', 'type': 1}
{'id': 5, 'time1': '20200722140628', 'type': 1}
{'id': 6, 'time1': '20200722140629', 'type': 1}
{'id': 7, 'time1': '20200722140631', 'type': 1}
{'id': 8, 'time1': '20200722140632', 'type': 1}

维表数据如下
idtype
2 err
1 err

我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble 


def from_kafka_to_kafka_demo():

# use blink table planner
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
st_env = 
StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)

# register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
register_mysql_source(st_env)
  

st_env.sql_update("insert into flink_result select  cast(t1.id as int) as 
id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 
t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")
st_env.execute("2-from_kafka_to_kafka")



def register_rides_source(st_env):
source_ddl = \
"""
create table source1(
 id int,
 time1 varchar ,
 type string
 ) with (
'connector' = 'kafka',
'topic' = 'tp1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
 )
"""
st_env.sql_update(source_ddl)

def register_mysql_source(st_env):
source_ddl = \
"""
CREATE TABLE dim_mysql (
id int,  -- 
type varchar -- 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3390/test',
'table-name' = 'flink_test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '***',
'password' = '***',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '1s',
'lookup.max-retries' = '3'
)
"""
st_env.sql_update(source_ddl)

def register_rides_sink(st_env):
sink_ddl = \
"""
CREATE TABLE flink_result (
id int,   
type varchar,
rtime bigint,
primary key(id)  NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3390/test',
'table-name' = 'flink_result',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '***',
'password' = '***',
'sink.buffer-flush.max-rows' = '5000', 
'sink.buffer-flush.interval' = '2s', 
'sink.max-retries' = '3'
)
"""
st_env.sql_update(sink_ddl)


if __name__ == '__main__':
from_kafka_to_kafka_demo()



初学者
PyFlink爱好者
琴师

 
发件人: Leonard Xu
发送时间: 2020-07-22 15:05
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
 
  我试了下应该是会更新缓存的,你有能复现的例子吗?
 
祝好
> 在 2020年7月22日,14:50,奇怪的不朽琴师 <1129656...@qq.com> 写道:
> 
> 你好:
> 
> 
> 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
> 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
> 
> 
> 谢谢
> 
> 
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>  
>  发送时间: 2020年7月22日(星期三) 下午2:42
> 收件人: "user-zh" 
> 主题: Re: flinksql1.11中主键声明的问题
> 
> 
> 
> Hello
> 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
> 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
> 
> 祝好
> Leonard Xu
> 
> 
> > 在 2020年7月22日,14:13,1129656...@qq.com 写道:
> > 
> > 输出结果仍然没有被更新
 


Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Wei Zhong
Congratulations! Thanks Dian for the great work!

Best,
Wei

> 在 2020年7月22日,15:09,Leonard Xu  写道:
> 
> Congratulations!
> 
> Thanks Dian Fu for the great work as release manager, and thanks everyone 
> involved!
> 
> Best
> Leonard Xu
> 
>> 在 2020年7月22日,14:52,Dian Fu  写道:
>> 
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 
>> series.
>> 
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
>> 
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
>> 
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>> 
>> Regards,
>> Dian
> 



Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Leonard Xu
Congratulations!

Thanks Dian Fu for the great work as release manager, and thanks everyone 
involved!

Best
Leonard Xu

> 在 2020年7月22日,14:52,Dian Fu  写道:
> 
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 
> series.
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html
> 
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> 
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
> 
> Regards,
> Dian



Re: flinksql1.11中主键声明的问题

2020-07-22 文章 Leonard Xu
Hi,

  我试了下应该是会更新缓存的,你有能复现的例子吗?

祝好
> 在 2020年7月22日,14:50,奇怪的不朽琴师 <1129656...@qq.com> 写道:
> 
> 你好:
> 
> 
> 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
> 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
> 
> 
> 谢谢
> 
> 
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>  
>  发送时间: 2020年7月22日(星期三) 下午2:42
> 收件人: "user-zh" 
> 主题: Re: flinksql1.11中主键声明的问题
> 
> 
> 
> Hello
> 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
> 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
> 
> 祝好
> Leonard Xu
> 
> 
> > 在 2020年7月22日,14:13,1129656...@qq.com 写道:
> > 
> > 输出结果仍然没有被更新