Re: 向flink push代码

2020-05-27 Thread
感谢大佬们,我看到  Leonard Xu大佬已经关注了FLINK-17991
<https://issues.apache.org/jira/browse/FLINK-17991>这个,好快的响应速度

On Thu, May 28, 2020 at 10:25 AM Leonard Xu  wrote:

> Hi,
> Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。
>
> Best,
> Leonard Xu
> [1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ <
> https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/>
>
> > 在 2020年5月28日,10:18,Yangze Guo  写道:
> >
> > 您好,社区的贡献代码教程[1]。
> >
> > Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> >
> > [1] https://flink.apache.org/zh/contributing/contribute-code.html
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
> >>
> >> 找打了教程了
> >>
> >>
> >> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
> >>
> >>> hi,
> >>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> >>>
>
>


Re: 向flink push代码

2020-05-27 Thread
找打了教程了


On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:

> hi,
> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
>


向flink push代码

2020-05-27 Thread
hi,
请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。


Re: 用Sql输出到oracle

2020-05-25 Thread
自己加一个oracle的dialect

On Tue, May 26, 2020 at 11:42 AM Kyle Zhang  wrote:

> 大家好,最近刚开始用flink,遇到一些问题。
> 我的场景是从kafka读数,group
> by之后输出到oracle,但是我看现在JdbcDialects只有mysql、postgres跟derby,大家oracle怎么用的。
> 现在我是把table转成stream再写到oracle里。


Re: flink1.9 Blink sql 丢失主键+去重和时态表联合使用吞吐量低

2020-05-10 Thread
hi、
我这面state backend用的是FsStateBackend,状态保存在hdfs

On Mon, May 11, 2020 at 11:19 AM Benchao Li  wrote:

> Hi,
>
> 你用的是什么state backend呢?看你的情况很有可能跟这个有关系。比如用的是rocksdb,然后是普通磁盘的话,很容易遇到IO瓶颈。
>
> 宇张  于2020年5月11日周一 上午11:14写道:
>
> > hi、
> > 我这面使用flink1.9的Blink sql完成数据转换操作,但遇到如下问题:
> > 1、使用row_number函数丢失主键
> > 2、row_number函数和时态表关联联合使用程序吞吐量严重降低,对应sql如下:
> > // 理论上这里面是不需要 distinct的,但sql中的主键blink提取不出来导致校验不通过,所以加了一个
> > SELECT distinct t1.id as
> order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY
> data.index0.id
> > ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> > rowNum<=1) t1 left join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF
> > t1.proctime t2 on t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR
> > SYSTEM_TIME AS OF t1.proctime t4 ON t1.so_id =t4.ID
> >
> >
> 上面的sql吞吐率很低,每秒就处理几条数据,而下面两种情况分开跑,吞吐量都能达标,仅时态表关联能到到几千条,仅rownumber能达到几万条,但不知道为什么他们俩联合后就只有几条了
> >
> > SELECT distinct t1.id as
> order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > data.index0.id,...,proctime from installmentdb_t_line_item)tmp ) t1 left
> > join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF t1.proctime t2 on
> > t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR SYSTEM_TIME AS OF
> > t1.proctime t4 ON t1.so_id =t4.ID
> >
> > SELECT distinct t1.id as
> order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY
> data.index0.id
> > ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> > rowNum<=1) t1
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


flink1.9 Blink sql 丢失主键+去重和时态表联合使用吞吐量低

2020-05-10 Thread
hi、
我这面使用flink1.9的Blink sql完成数据转换操作,但遇到如下问题:
1、使用row_number函数丢失主键
2、row_number函数和时态表关联联合使用程序吞吐量严重降低,对应sql如下:
// 理论上这里面是不需要 distinct的,但sql中的主键blink提取不出来导致校验不通过,所以加了一个
SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY data.index0.id
ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
rowNum<=1) t1 left join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF
t1.proctime t2 on t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR
SYSTEM_TIME AS OF t1.proctime t4 ON t1.so_id =t4.ID
上面的sql吞吐率很低,每秒就处理几条数据,而下面两种情况分开跑,吞吐量都能达标,仅时态表关联能到到几千条,仅rownumber能达到几万条,但不知道为什么他们俩联合后就只有几条了

SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
data.index0.id,...,proctime from installmentdb_t_line_item)tmp ) t1 left
join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF t1.proctime t2 on
t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR SYSTEM_TIME AS OF
t1.proctime t4 ON t1.so_id =t4.ID

SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY data.index0.id
ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
rowNum<=1) t1


Re: flink1.10基于win10搭建Standlone服务

2020-04-23 Thread
好吧,windows不在维护了。。。
https://issues.apache.org/jira/browse/FLINK-15925

On Thu, Apr 23, 2020 at 5:30 PM 蒋佳成(Jiacheng Jiang) <920334...@qq.com>
wrote:

>
> Causedby:org.apache.flink.configuration.IllegalConfigurationException:
>
> Thenetworkmemorymin(64mb)andmax(1gb)mismatch,thenetworkmemory
>
> hastoberesolvedandsettoafixedvaluebeforetaskexecutorstarts
>
>
>
>
> 网络内存错误。1.10内存变了很多,你先看看文档
>
>
>
>
> --原始邮件--
> 发件人:"宇张" 发送时间:2020年4月23日(星期四) 下午5:23
> 收件人:"user-zh"
> 主题:Re: flink1.10基于win10搭建Standlone服务
>
>
>
> 呃,是的,某些设置的默认值都变为null了,所以tm启动报错,依次让设置这三个值,但这三个设置后报错变为下面的了,请问这个要怎么搞
> taskmanager.cpu.cores: 3
> taskmanager.memory.task.heap.size: 256mb
> taskmanager.memory.managed.size: 256mb
>
> org.apache.flink.configuration.IllegalConfigurationException: Failed to
> create TaskExecutorResourceSpec
> at
>
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:72)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner. at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$2(TaskManagerRunner.java:322)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:321)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:287)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> The network memory min (64 mb) and max (1 gb) mismatch, the network memory
> has to be resolved and set to a fixed value before task executor starts
> at
>
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorNetworkConfigSet(TaskExecutorResourceUtils.java:100)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorResourceConfigSet(TaskExecutorResourceUtils.java:85)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:70)
> ... 7 more
>
> On Thu, Apr 23, 2020 at 5:04 PM 蒋佳成(Jiacheng Jiang) <920334...@qq.com
> wrote:
>
>  查看日志估计是内存没有设置
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人: "宇张"  发送时间: 2020年4月23日(星期四) 下午5:03
>  收件人: "user-zh"  主题: flink1.10基于win10搭建Standlone服务
> 
> 
> 
>  hi,我这面在win10 基于Standlone搭建了一个Flink1.10(好吧,就是解压启动)
>  ,然后执行start-cluster.bat启动服务,会弹出两个dos窗口,一个jm、一个tm(猜的),
>  但是几秒后tm 对应的dos窗口闪退导致程序没办法申请资源,这个不知道是flink问题还是win10问题,但是flink1.9是正常的


Re: flink1.10基于win10搭建Standlone服务

2020-04-23 Thread
现在搭建测试环境都要改配置文件了,感觉还是以前的小白式启动(解压运行)友好一点,哈哈

On Thu, Apr 23, 2020 at 5:23 PM 宇张  wrote:

> 呃,是的,某些设置的默认值都变为null了,所以tm启动报错,依次让设置这三个值,但这三个设置后报错变为下面的了,请问这个要怎么搞
> taskmanager.cpu.cores: 3
> taskmanager.memory.task.heap.size: 256mb
> taskmanager.memory.managed.size: 256mb
>
> org.apache.flink.configuration.IllegalConfigurationException: Failed to
> create TaskExecutorResourceSpec
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:72)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:152)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$2(TaskManagerRunner.java:322)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:321)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:287)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> The network memory min (64 mb) and max (1 gb) mismatch, the network memory
> has to be resolved and set to a fixed value before task executor starts
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorNetworkConfigSet(TaskExecutorResourceUtils.java:100)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorResourceConfigSet(TaskExecutorResourceUtils.java:85)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:70)
> ... 7 more
>
> On Thu, Apr 23, 2020 at 5:04 PM 蒋佳成(Jiacheng Jiang) <920334...@qq.com>
> wrote:
>
>> 查看日志估计是内存没有设置
>>
>>
>>
>>
>> --原始邮件--
>> 发件人: "宇张"> 发送时间: 2020年4月23日(星期四) 下午5:03
>> 收件人: "user-zh"> 主题: flink1.10基于win10搭建Standlone服务
>>
>>
>>
>> hi,我这面在win10 基于Standlone搭建了一个Flink1.10(好吧,就是解压启动)
>> ,然后执行start-cluster.bat启动服务,会弹出两个dos窗口,一个jm、一个tm(猜的),
>> 但是几秒后tm 对应的dos窗口闪退导致程序没办法申请资源,这个不知道是flink问题还是win10问题,但是flink1.9是正常的
>
>


Re: flink1.10基于win10搭建Standlone服务

2020-04-23 Thread
呃,是的,某些设置的默认值都变为null了,所以tm启动报错,依次让设置这三个值,但这三个设置后报错变为下面的了,请问这个要怎么搞
taskmanager.cpu.cores: 3
taskmanager.memory.task.heap.size: 256mb
taskmanager.memory.managed.size: 256mb

org.apache.flink.configuration.IllegalConfigurationException: Failed to
create TaskExecutorResourceSpec
at
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:72)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:152)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$2(TaskManagerRunner.java:322)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:321)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:287)
Caused by: org.apache.flink.configuration.IllegalConfigurationException:
The network memory min (64 mb) and max (1 gb) mismatch, the network memory
has to be resolved and set to a fixed value before task executor starts
at
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorNetworkConfigSet(TaskExecutorResourceUtils.java:100)
at
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorResourceConfigSet(TaskExecutorResourceUtils.java:85)
at
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:70)
... 7 more

On Thu, Apr 23, 2020 at 5:04 PM 蒋佳成(Jiacheng Jiang) <920334...@qq.com>
wrote:

> 查看日志估计是内存没有设置
>
>
>
>
> --原始邮件------
> 发件人: "宇张" 发送时间: 2020年4月23日(星期四) 下午5:03
> 收件人: "user-zh" 主题: flink1.10基于win10搭建Standlone服务
>
>
>
> hi,我这面在win10 基于Standlone搭建了一个Flink1.10(好吧,就是解压启动)
> ,然后执行start-cluster.bat启动服务,会弹出两个dos窗口,一个jm、一个tm(猜的),
> 但是几秒后tm 对应的dos窗口闪退导致程序没办法申请资源,这个不知道是flink问题还是win10问题,但是flink1.9是正常的


flink1.10基于win10搭建Standlone服务

2020-04-23 Thread
hi,我这面在win10 基于Standlone搭建了一个Flink1.10(好吧,就是解压启动)
,然后执行start-cluster.bat启动服务,会弹出两个dos窗口,一个jm、一个tm(猜的),
但是几秒后tm 对应的dos窗口闪退导致程序没办法申请资源,这个不知道是flink问题还是win10问题,但是flink1.9是正常的


Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-23 Thread
嗯嗯,刚刚把blink包改为provided后程序能正常运行了,但接下来工程打包的时候都要手动加下面这些感觉比较麻烦,不过能运行就好。

感谢答疑

org.apache.flink:flink-connector-kafka-base_2.11


On Thu, Apr 23, 2020 at 4:36 PM Jingsong Li  wrote:

> 不能把lib下有的jar打进去。
>
> 比如flink-table-planner-blink,lib下也有一份flink-table-planner-blink
>
> 把这一堆去掉吧:
> org.apache.flink:flink-table-common
> org.apache.flink:flink-table-api-java
>
> org.apache.flink:flink-table-api-java-bridge_2.11
> org.apache.flink:flink-table-planner-blink_2.11
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 23, 2020 at 4:24 PM 宇张  wrote:
>
> > 》》加上  >
> >
> combine.children="append">这部分配置之后对应的TableFactory文件里面有对应的KafkaFactory信息了,虽说程序还是无法运行,但是错误变为jar包冲突了,也就不是先前加载不到的错误;
> > 但是感觉每次都配置这些貌似对用户不太友好。
> >
> > org.codehaus.janino.CompilerFactory cannot be cast to
> > org.codehaus.commons.compiler.ICompilerFactory
> >
> >
> > 
> > 
> > 
> > org.apache.flink:flink-table-common
> > org.apache.flink:flink-table-api-java
> >
> > org.apache.flink:flink-table-api-java-bridge_2.11
> >
>  org.apache.flink:flink-table-planner-blink_2.11
> >
>  org.apache.flink:flink-connector-kafka-0.11_2.11
> >
>  org.apache.flink:flink-connector-kafka-0.9_2.11
> >
>  org.apache.flink:flink-connector-kafka-0.10_2.11
> >
>  org.apache.flink:flink-connector-kafka-base_2.11
> > org.apache.flink:flink-jdbc_2.11
> > org.apache.flink:flink-json
> > 
> > 
> >
> > 
> > 
> >  >
> >
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> > 
> >  >
> >
> implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
> > Apache Flink
> > UTF-8
> > 
> >  >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> > com.akulaku.data.main.StreamMain
> > 
> > 
> >
> >
> > On Thu, Apr 23, 2020 at 4:07 PM Jingsong Li 
> > wrote:
> >
> > > Hi 张,
> > >
> > > 加上这个[1]试试:
> > >
> > > 
> > >   
> > >> >
> >
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> > >   
> > >> >
> >
> implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
> > > Apache Flink
> > > UTF-8
> > >   
> > > 
> > >
> > >
> > > [1]https://github.com/apache/flink/blob/master/pom.xml#L1654
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, Apr 23, 2020 at 3:56 PM 宇张  wrote:
> > >
> > > > 下面配置中,如果不加flink-json模块是可以打出kafkafactory的,加了flink-json模块就只剩下
> > > > JsonRowFormatFactory一个类,kafka的Factory就打印不出来了,所以是不是某一部分导致冲突了,
> > > > 但我看我先前flink1.9的工程,里面也无法打印kafkaFactory类,只有一个
> > > > GenericInMemoryCatalogFactory类,但flink1.9和1.10对比,看发布文档类加载策略有过改动
> > > >
> > > > org.apache.flink:flink-connector-kafka-0.11_2.11
> > > > org.apache.flink:flink-connector-kafka-base_2.11
> > > > org.apache.flink:flink-json
> > > >
> > > >
> > > > On Thu, Apr 23, 2020 at 3:43 PM Jingsong Li 
> > > > wrote:
> > > >
> > > > > > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader
> > > > > 有问题。之前FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
> > > > >
> > > > > @tison 不管怎么样,也得保证jar里的SPI文件包含Kafka的类,不然SPI没法找
> > > > >
> > > > > @宇张 建议你仔细看下[1],这个pom是能打出正确的SPI文件的
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> > > > >
> > > > > Best,
> > > > > Jingsong Lee
> > > > >
> > > > > On Thu, Apr 23, 2020 at 3:35 PM tison 
> wrote:
> > > > >
> > > > > > 另外你 shaded 里面去 shaded com.ibm.icu 也意义不明...
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > tison  于2020年4月23日周四 下午3:34写道:
> > > > > >
> > > > > > > 这个问题我建议你记

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-23 Thread
》》加上 这部分配置之后对应的TableFactory文件里面有对应的KafkaFactory信息了,虽说程序还是无法运行,但是错误变为jar包冲突了,也就不是先前加载不到的错误;
但是感觉每次都配置这些貌似对用户不太友好。

org.codehaus.janino.CompilerFactory cannot be cast to
org.codehaus.commons.compiler.ICompilerFactory





org.apache.flink:flink-table-common
org.apache.flink:flink-table-api-java
org.apache.flink:flink-table-api-java-bridge_2.11
org.apache.flink:flink-table-planner-blink_2.11
org.apache.flink:flink-connector-kafka-0.11_2.11
org.apache.flink:flink-connector-kafka-0.9_2.11
org.apache.flink:flink-connector-kafka-0.10_2.11
org.apache.flink:flink-connector-kafka-base_2.11
org.apache.flink:flink-jdbc_2.11
org.apache.flink:flink-json








Apache Flink
UTF-8


com.akulaku.data.main.StreamMain




On Thu, Apr 23, 2020 at 4:07 PM Jingsong Li  wrote:

> Hi 张,
>
> 加上这个[1]试试:
>
> 
>   
>implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>   
>implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
> Apache Flink
> UTF-8
>   
> 
>
>
> [1]https://github.com/apache/flink/blob/master/pom.xml#L1654
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 23, 2020 at 3:56 PM 宇张  wrote:
>
> > 下面配置中,如果不加flink-json模块是可以打出kafkafactory的,加了flink-json模块就只剩下
> > JsonRowFormatFactory一个类,kafka的Factory就打印不出来了,所以是不是某一部分导致冲突了,
> > 但我看我先前flink1.9的工程,里面也无法打印kafkaFactory类,只有一个
> > GenericInMemoryCatalogFactory类,但flink1.9和1.10对比,看发布文档类加载策略有过改动
> >
> > org.apache.flink:flink-connector-kafka-0.11_2.11
> > org.apache.flink:flink-connector-kafka-base_2.11
> > org.apache.flink:flink-json
> >
> >
> > On Thu, Apr 23, 2020 at 3:43 PM Jingsong Li 
> > wrote:
> >
> > > > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader
> > > 有问题。之前FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
> > >
> > > @tison 不管怎么样,也得保证jar里的SPI文件包含Kafka的类,不然SPI没法找
> > >
> > > @宇张 建议你仔细看下[1],这个pom是能打出正确的SPI文件的
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, Apr 23, 2020 at 3:35 PM tison  wrote:
> > >
> > > > 另外你 shaded 里面去 shaded com.ibm.icu 也意义不明...
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > tison  于2020年4月23日周四 下午3:34写道:
> > > >
> > > > > 这个问题我建议你记一个 JIRA 然后提供一个可复现的程序。因为你如果是 Flink Standalone Session 模式,在
> > > Client
> > > > > 端编译失败抛出如上异常,不应该跟放不放在 lib 下有什么关系。这边听你说感觉也很奇怪,可能需要本地复现一下比较好判断。
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > 宇张  于2020年4月23日周四 上午11:53写道:
> > > > >
> > > > >> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> > > > >> KafkaTableSourceSinkFactory
> > > > >> 吗?(同时 class loading 为 child-first)
> > > > >> 》》是的
> > > > >>
> > > > >> On Thu, Apr 23, 2020 at 11:42 AM tison 
> > wrote:
> > > > >>
> > > > >> > >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> > > > >> > >这个能拿到
> > > > >> >
> > > > >> > 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> > > > >> > KafkaTableSourceSinkFactory
> > > > >> > 吗?(同时 class loading 为 child-first)
> > > > >> >
> > > > >> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader
> > > > 有问题。之前
> > > > >> > FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
> > > > >> >
> > > > >> > Best,
> > > > >> > tison.
> > > > >> >
> > > > >> >
> > > > >> > 宇张  于2020年4月23日周四 上午11:36写道:
> > > > >> >
> > > > >> > > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
> > > > >> > >
> > > > >> > > 
> > > > >> > > org.apache.maven.plugins
> > > > >> > > maven-shade-plugin
> > > > >> > > 
> > > > >> > > 
> > > > >> > > 
> > > > >> > > p

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-23 Thread
下面配置中,如果不加flink-json模块是可以打出kafkafactory的,加了flink-json模块就只剩下
JsonRowFormatFactory一个类,kafka的Factory就打印不出来了,所以是不是某一部分导致冲突了,
但我看我先前flink1.9的工程,里面也无法打印kafkaFactory类,只有一个
GenericInMemoryCatalogFactory类,但flink1.9和1.10对比,看发布文档类加载策略有过改动

org.apache.flink:flink-connector-kafka-0.11_2.11
org.apache.flink:flink-connector-kafka-base_2.11
org.apache.flink:flink-json


On Thu, Apr 23, 2020 at 3:43 PM Jingsong Li  wrote:

> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader
> 有问题。之前FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
>
> @tison 不管怎么样,也得保证jar里的SPI文件包含Kafka的类,不然SPI没法找
>
> @宇张 建议你仔细看下[1],这个pom是能打出正确的SPI文件的
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 23, 2020 at 3:35 PM tison  wrote:
>
> > 另外你 shaded 里面去 shaded com.ibm.icu 也意义不明...
> >
> > Best,
> > tison.
> >
> >
> > tison  于2020年4月23日周四 下午3:34写道:
> >
> > > 这个问题我建议你记一个 JIRA 然后提供一个可复现的程序。因为你如果是 Flink Standalone Session 模式,在
> Client
> > > 端编译失败抛出如上异常,不应该跟放不放在 lib 下有什么关系。这边听你说感觉也很奇怪,可能需要本地复现一下比较好判断。
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > 宇张  于2020年4月23日周四 上午11:53写道:
> > >
> > >> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> > >> KafkaTableSourceSinkFactory
> > >> 吗?(同时 class loading 为 child-first)
> > >> 》》是的
> > >>
> > >> On Thu, Apr 23, 2020 at 11:42 AM tison  wrote:
> > >>
> > >> > >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> > >> > >这个能拿到
> > >> >
> > >> > 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> > >> > KafkaTableSourceSinkFactory
> > >> > 吗?(同时 class loading 为 child-first)
> > >> >
> > >> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader
> > 有问题。之前
> > >> > FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
> > >> >
> > >> > Best,
> > >> > tison.
> > >> >
> > >> >
> > >> > 宇张  于2020年4月23日周四 上午11:36写道:
> > >> >
> > >> > > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
> > >> > >
> > >> > > 
> > >> > > org.apache.maven.plugins
> > >> > > maven-shade-plugin
> > >> > > 
> > >> > > 
> > >> > > 
> > >> > > package
> > >> > > 
> > >> > > shade
> > >> > > 
> > >> > > 
> > >> > > 
> > >> > >  > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> > >> > >
> > >> > > com.akulaku.data.main.StreamMain
> > >> > > 
> > >> > > 
> > >> > >
> > >> > > 
> > >> > > 
> > >> > > *:*
> > >> > > 
> > >> > > META-INF/*.SF
> > >> > > META-INF/*.DSA
> > >> > > META-INF/*.RSA
> > >> > > 
> > >> > > 
> > >> > > 
> > >> > >
> > >> > > 
> > >> > > 
> > >> > > 
> > >> > >
> > >> > > org.apache.flink:flink-table-common
> > >> > >
> > >> > > org.apache.flink:flink-table-api-java
> > >> > >
> > >> > >
> org.apache.flink:flink-table-api-java-bridge_2.11
> > >> > >
> > >> > > org.apache.flink:flink-table-planner-blink_2.11
> > >> > >
> > >> > >
> org.apache.flink:flink-connector-kafka-0.11_2.11
> > >> > >
> > >> > >
> org.apache.flink:flink-connector-kafka-base_2.11
> > >> > >
> >  org.apache.flink:flink-json
> > >> > >

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread
你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
KafkaTableSourceSinkFactory
吗?(同时 class loading 为 child-first)
》》是的

On Thu, Apr 23, 2020 at 11:42 AM tison  wrote:

> >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> >这个能拿到
>
> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> KafkaTableSourceSinkFactory
> 吗?(同时 class loading 为 child-first)
>
> 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader 有问题。之前
> FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
>
> Best,
> tison.
>
>
> 宇张  于2020年4月23日周四 上午11:36写道:
>
> > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
> >
> > 
> > org.apache.maven.plugins
> > maven-shade-plugin
> > 
> > 
> > 
> > package
> > 
> > shade
> > 
> > 
> > 
> >  >
> >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> >
> > com.akulaku.data.main.StreamMain
> > 
> > 
> >
> > 
> > 
> > *:*
> > 
> > META-INF/*.SF
> > META-INF/*.DSA
> > META-INF/*.RSA
> > 
> > 
> > 
> >
> > 
> > 
> > 
> >
> > org.apache.flink:flink-table-common
> >
> > org.apache.flink:flink-table-api-java
> >
> > org.apache.flink:flink-table-api-java-bridge_2.11
> >
> > org.apache.flink:flink-table-planner-blink_2.11
> >
> > org.apache.flink:flink-connector-kafka-0.11_2.11
> >
> > org.apache.flink:flink-connector-kafka-base_2.11
> > org.apache.flink:flink-json
> > 
> > 
> > 
> > 
> > 
> > com.ibm.icu
> >
> > org.apache.flink.table.shaded.com.ibm.icu
> > 
> > 
> > 
> > 
> > 
> > 
> >
> >
> > On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li 
> > wrote:
> >
> > > Hi,
> > >
> > > Flink的connector发现机制是通过java
> spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
> > >
> > > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
> > >
> > > 只是类文件是没有用的,没地方引用到它。
> > >
> > > 你试试[1]中的方法?添加combine.children
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
> > >
> > > >
> > > >
> > >
> >
> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
> > > >
> > > >
> > >
> >
> META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
> > > > 下面是我maven插件配置:
> > > >
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > 
> > > > org.apache.maven.plugins
> > > > maven-shade-plugin
> > > > 
> > > > 
> > > > 
> > > > package
> > > > 
> > > > shade
> > > >     
> > > > 
> > > > 
> > > >  > > >
> > > >
> > > >
> > >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransfo

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread
我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:


org.apache.maven.plugins
maven-shade-plugin



package

shade




com.akulaku.data.main.StreamMain





*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA







org.apache.flink:flink-table-common
org.apache.flink:flink-table-api-java

org.apache.flink:flink-table-api-java-bridge_2.11

org.apache.flink:flink-table-planner-blink_2.11

org.apache.flink:flink-connector-kafka-0.11_2.11

org.apache.flink:flink-connector-kafka-base_2.11
org.apache.flink:flink-json





com.ibm.icu

org.apache.flink.table.shaded.com.ibm.icu








On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li  wrote:

> Hi,
>
> Flink的connector发现机制是通过java spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
>
> > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
>
> 只是类文件是没有用的,没地方引用到它。
>
> 你试试[1]中的方法?添加combine.children
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
>
> >
> >
> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
> >
> >
> META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
> > 下面是我maven插件配置:
> >
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > org.apache.maven.plugins
> > maven-shade-plugin
> > 
> > 
> > 
> > package
> > 
> > shade
> > 
> > 
> > 
> >  >
> >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> >
> > com.akulaku.data.main.StreamMain
> > 
> > 
> >
> > 
> > 
> > *:*
> > 
> > META-INF/*.SF
> > META-INF/*.DSA
> > META-INF/*.RSA
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> >
> >
> > On Wed, Apr 22, 2020 at 8:00 PM Jingsong Li 
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> >
> 如果org.apache.flink.table.factories.TableFactory里面没有KafkaTableSourceSinkFactory,那就是打包有问题。不清楚1.9的是怎么运行起来的,但是所有的jar的meta-inf-services文件都没有KafkaTableSourceSinkFactory,那也不应该能运行起来的。
> > >
> > > 推荐打包方式用shade,shade会merge meta-inf-services的文件的。
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Apr 22, 2020 at 7:31 PM 宇张  wrote:
> > >
> > > >
> > > >
> > >
> >
> 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> > > >
> > > >
> > >
> >
> 这个我看了一下我先前flink1.9的工程,应用程序Jar里面也是没有这个类的,但是程序运行加载是没问题的,这么对比貌似就不是maven打包的问题了。
> > > >
> > > > On Wed, Apr 22, 2020 at 7:22 PM 宇张  wrote:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> > > > > 这个没有,只有org.apache.flink.formats.json.JsonRowFormatFactory
> > > > > 》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> > > > > 这个能拿到
> > > > >
> > > > > 这么看来 貌似是 mvn打包有问题:
> > > > > mvn clean package -DskipTests
>

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread
我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
下面是我maven插件配置:


































org.apache.maven.plugins
maven-shade-plugin



package

shade




com.akulaku.data.main.StreamMain





*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA









On Wed, Apr 22, 2020 at 8:00 PM Jingsong Li  wrote:

> Hi,
>
>
> 如果org.apache.flink.table.factories.TableFactory里面没有KafkaTableSourceSinkFactory,那就是打包有问题。不清楚1.9的是怎么运行起来的,但是所有的jar的meta-inf-services文件都没有KafkaTableSourceSinkFactory,那也不应该能运行起来的。
>
> 推荐打包方式用shade,shade会merge meta-inf-services的文件的。
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 22, 2020 at 7:31 PM 宇张  wrote:
>
> >
> >
> 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> >
> >
> 这个我看了一下我先前flink1.9的工程,应用程序Jar里面也是没有这个类的,但是程序运行加载是没问题的,这么对比貌似就不是maven打包的问题了。
> >
> > On Wed, Apr 22, 2020 at 7:22 PM 宇张  wrote:
> >
> > >
> > >
> >
> 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> > > 这个没有,只有org.apache.flink.formats.json.JsonRowFormatFactory
> > > 》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> > > 这个能拿到
> > >
> > > 这么看来 貌似是 mvn打包有问题:
> > > mvn clean package -DskipTests
> > > 依赖范围为默认
> > >
> > >
> > > On Wed, Apr 22, 2020 at 7:05 PM Jingsong Li 
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >>
> > >>
> >
> 也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> > >>
> > >> >
> 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
> > >> 是的,拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> > >>
> > >> Best,
> > >> Jingsong Lee
> > >>
> > >> On Wed, Apr 22, 2020 at 7:00 PM 宇张  wrote:
> > >>
> > >> > 看下你打包的 UberJar 里有没一个内容包括
> > >> > 1、下面这个文件是存在的
> > >> >
> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> > >> > 的文件
> > >> > META-INF/services/org.apache.flink.table.factories.TableFactory
> > >> > 2、flink版本1.10,Standalone模式启动服务(start-cluster.sh),flink
> > >> > run运行(/software/flink-1.10.0/bin/flink run -c
> com.data.main.StreamMain
> > >> > ./flink_1.10_test-1.0-jar-with-dependencies.jar)
> > >> > 3、再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> > >> >
> 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
> > >> >
> > >> >
> > >> >
> > >> > On Wed, Apr 22, 2020 at 6:00 PM Jingsong Li  >
> > >> > wrote:
> > >> >
> > >> > > Hi,
> > >> > >
> > >> > > 先确认下你的Jar包里有没有 meta-inf-services的文件?里面确定有Kafka?
> > >> > >
> > >> > > 如果有,再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> > >> > > 因为现在默认是通过ThreadClassLoader来获取Factory的。
> > >> > >
> > >> > > Best,
> > >> > > Jingsong Lee
> > >> > >
> > >> > > On Wed, Apr 22, 2020 at 5:30 PM 宇张  wrote:
> > >> > >
> > >> > > > 我这面使用Standalone模式运行Flink任务,但是Uber
> > >> > > > Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
> > >> > > > child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
> > >> > > Jar里面的Factory不能被加载
> > >> > > > Flink Client respects Classloading Policy (FLINK-13749
> > >> > > > <https://issues.apache.org/jira/browse/FLINK-13749>)
> > >> > > > <
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://ci.apache.org/proj

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread
》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
这个我看了一下我先前flink1.9的工程,应用程序Jar里面也是没有这个类的,但是程序运行加载是没问题的,这么对比貌似就不是maven打包的问题了。

On Wed, Apr 22, 2020 at 7:22 PM 宇张  wrote:

>
> 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> 这个没有,只有org.apache.flink.formats.json.JsonRowFormatFactory
> 》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> 这个能拿到
>
> 这么看来 貌似是 mvn打包有问题:
> mvn clean package -DskipTests
> 依赖范围为默认
>
>
> On Wed, Apr 22, 2020 at 7:05 PM Jingsong Li 
> wrote:
>
>> Hi,
>>
>>
>> 也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
>>
>> > 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
>> 是的,拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Apr 22, 2020 at 7:00 PM 宇张  wrote:
>>
>> > 看下你打包的 UberJar 里有没一个内容包括
>> > 1、下面这个文件是存在的
>> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> > 的文件
>> > META-INF/services/org.apache.flink.table.factories.TableFactory
>> > 2、flink版本1.10,Standalone模式启动服务(start-cluster.sh),flink
>> > run运行(/software/flink-1.10.0/bin/flink run -c com.data.main.StreamMain
>> > ./flink_1.10_test-1.0-jar-with-dependencies.jar)
>> > 3、再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
>> > 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
>> >
>> >
>> >
>> > On Wed, Apr 22, 2020 at 6:00 PM Jingsong Li 
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > 先确认下你的Jar包里有没有 meta-inf-services的文件?里面确定有Kafka?
>> > >
>> > > 如果有,再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
>> > > 因为现在默认是通过ThreadClassLoader来获取Factory的。
>> > >
>> > > Best,
>> > > Jingsong Lee
>> > >
>> > > On Wed, Apr 22, 2020 at 5:30 PM 宇张  wrote:
>> > >
>> > > > 我这面使用Standalone模式运行Flink任务,但是Uber
>> > > > Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
>> > > > child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
>> > > Jar里面的Factory不能被加载
>> > > > Flink Client respects Classloading Policy (FLINK-13749
>> > > > <https://issues.apache.org/jira/browse/FLINK-13749>)
>> > > > <
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
>> > > > >
>> > > >
>> > > > The Flink client now also respects the configured classloading
>> policy,
>> > > > i.e., parent-first or child-first classloading. Previously, only
>> > cluster
>> > > > components such as the job manager or task manager supported this
>> > > setting.
>> > > > This does mean that users might get different behaviour in their
>> > > programs,
>> > > > in which case they should configure the classloading policy
>> explicitly
>> > to
>> > > > use parent-first classloading, which was the previous (hard-coded)
>> > > > behaviour.
>> > > >
>> > > > 异常信息:
>> > > >
>> > > >   rg.apache.flink.client.program.ProgramInvocationException: The
>> main
>> > > > method caused an error: findAndCreateTableSource failed.
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> > > > at
>> > >
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> > 

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread
》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
这个没有,只有org.apache.flink.formats.json.JsonRowFormatFactory
》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
这个能拿到

这么看来 貌似是 mvn打包有问题:
mvn clean package -DskipTests
依赖范围为默认


On Wed, Apr 22, 2020 at 7:05 PM Jingsong Li  wrote:

> Hi,
>
>
> 也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
>
> > 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
> 是的,拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 22, 2020 at 7:00 PM 宇张  wrote:
>
> > 看下你打包的 UberJar 里有没一个内容包括
> > 1、下面这个文件是存在的
> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> > 的文件
> > META-INF/services/org.apache.flink.table.factories.TableFactory
> > 2、flink版本1.10,Standalone模式启动服务(start-cluster.sh),flink
> > run运行(/software/flink-1.10.0/bin/flink run -c com.data.main.StreamMain
> > ./flink_1.10_test-1.0-jar-with-dependencies.jar)
> > 3、再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> > 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
> >
> >
> >
> > On Wed, Apr 22, 2020 at 6:00 PM Jingsong Li 
> > wrote:
> >
> > > Hi,
> > >
> > > 先确认下你的Jar包里有没有 meta-inf-services的文件?里面确定有Kafka?
> > >
> > > 如果有,再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> > > 因为现在默认是通过ThreadClassLoader来获取Factory的。
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Apr 22, 2020 at 5:30 PM 宇张  wrote:
> > >
> > > > 我这面使用Standalone模式运行Flink任务,但是Uber
> > > > Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
> > > > child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
> > > Jar里面的Factory不能被加载
> > > > Flink Client respects Classloading Policy (FLINK-13749
> > > > <https://issues.apache.org/jira/browse/FLINK-13749>)
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
> > > > >
> > > >
> > > > The Flink client now also respects the configured classloading
> policy,
> > > > i.e., parent-first or child-first classloading. Previously, only
> > cluster
> > > > components such as the job manager or task manager supported this
> > > setting.
> > > > This does mean that users might get different behaviour in their
> > > programs,
> > > > in which case they should configure the classloading policy
> explicitly
> > to
> > > > use parent-first classloading, which was the previous (hard-coded)
> > > > behaviour.
> > > >
> > > > 异常信息:
> > > >
> > > >   rg.apache.flink.client.program.ProgramInvocationException: The main
> > > > method caused an error: findAndCreateTableSource failed.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > > at
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> > > > Caused by: org.apache.flink.table.api.TableException:
> > > > findAndCreateTableSource failed.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.factories.

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread
看下你打包的 UberJar 里有没一个内容包括
1、下面这个文件是存在的
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
的文件
META-INF/services/org.apache.flink.table.factories.TableFactory
2、flink版本1.10,Standalone模式启动服务(start-cluster.sh),flink
run运行(/software/flink-1.10.0/bin/flink run -c com.data.main.StreamMain
./flink_1.10_test-1.0-jar-with-dependencies.jar)
3、再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()



On Wed, Apr 22, 2020 at 6:00 PM Jingsong Li  wrote:

> Hi,
>
> 先确认下你的Jar包里有没有 meta-inf-services的文件?里面确定有Kafka?
>
> 如果有,再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> 因为现在默认是通过ThreadClassLoader来获取Factory的。
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 22, 2020 at 5:30 PM 宇张  wrote:
>
> > 我这面使用Standalone模式运行Flink任务,但是Uber
> > Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
> > child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
> Jar里面的Factory不能被加载
> > Flink Client respects Classloading Policy (FLINK-13749
> > <https://issues.apache.org/jira/browse/FLINK-13749>)
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
> > >
> >
> > The Flink client now also respects the configured classloading policy,
> > i.e., parent-first or child-first classloading. Previously, only cluster
> > components such as the job manager or task manager supported this
> setting.
> > This does mean that users might get different behaviour in their
> programs,
> > in which case they should configure the classloading policy explicitly to
> > use parent-first classloading, which was the previous (hard-coded)
> > behaviour.
> >
> > 异常信息:
> >
> >   rg.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: findAndCreateTableSource failed.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> > at
> >
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> > Caused by: org.apache.flink.table.api.TableException:
> > findAndCreateTableSource failed.
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToR

Re: flink1.10关于jar包冲突问题

2020-04-22 Thread
好的,接下来工程中我会把 不需要的传递依赖都应该 exclude 掉,而遇到的jar包冲突问题会进行记录,感谢解疑

On Wed, Apr 22, 2020 at 2:16 PM tison  wrote:

> 能具体看一下报错吗?一般来说 Flink 自己需要的依赖都会 shaded 起来,不需要的传递依赖都应该 exclude 掉。暴露成 API
> 的类别一般需要封装或者使用稳定的接口。
>
> 这可能是一个工程上的问题,你可以具体罗列一下遇到的 JAR 包冲突问题,看一下怎么解。
>
> Best,
> tison.
>
>
> 宇张  于2020年4月22日周三 上午11:52写道:
>
> > 在使用Flink1.10时,遇到最多的问题就是jar包冲突问题,okio这个包flink-parent引用的就有四个版本,还有一些没办法<
> > exclusions>的包,请问社区有没有优化jar包冲突的提议。
> >
>


关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread
我这面使用Standalone模式运行Flink任务,但是Uber
Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber Jar里面的Factory不能被加载
Flink Client respects Classloading Policy (FLINK-13749
)


The Flink client now also respects the configured classloading policy,
i.e., parent-first or child-first classloading. Previously, only cluster
components such as the job manager or task manager supported this setting.
This does mean that users might get different behaviour in their programs,
in which case they should configure the classloading policy explicitly to
use parent-first classloading, which was the previous (hard-coded)
behaviour.

异常信息:

  rg.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: findAndCreateTableSource failed.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.TableException:
findAndCreateTableSource failed.
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.akulaku.data.main.StreamMain.main(StreamMain.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following 

flink1.10关于jar包冲突问题

2020-04-21 Thread
在使用Flink1.10时,遇到最多的问题就是jar包冲突问题,okio这个包flink-parent引用的就有四个版本,还有一些没办法<
exclusions>的包,请问社区有没有优化jar包冲突的提议。


flink 1.10 catalog保存到hive

2020-03-31 Thread
hi:
我们这面想使用hive来存储flink catalog数据,那么在元数据保存删除的时候怎么来校验是否拥有hive元数据操作权限哪


Re: Flink 1.10 JSON 解析

2020-03-20 Thread
anslateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at com.akulaku.data.main.StreamMain.main(StreamMain.java:58)


On Fri, Mar 20, 2020 at 4:43 PM Jark Wu  wrote:

> Hi,
>
> 你发的图片都裂开了。。。 建议直接贴文本或者先上传到某个图床服务,然后将链接贴过来。
>
> 1. 使用 DECIMAL 抛什么错误呢?
> 2. 如果保留jsonSchema的话,要保证 table schema 和 json schema 是一致的,也就是不仅 table schema
> 要正确,json schema 也得要正确。
> 这其实多了很多额外的成本,所以一般建议不配置 jsonSchema。理论上 table schema 能映射出所有的复杂的格式。
>
> Best,
> Jark
>
>
> On Fri, 20 Mar 2020 at 14:48, 宇张  wrote:
>
> > hi、
> > 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug
> > [image: image.png]
> >
> > On Fri, Mar 20, 2020 at 2:17 PM 宇张  wrote:
> >
> >> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成
> >> ARRAY(ROW(...))
> >> 另外删除
> >> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
> >> [image: image.png]
> >>
> >>
> >> On Fri, Mar 20, 2020 at 12:08 PM 宇张  wrote:
> >>
> >>> hi,
> >>> 好的,我这面进行了尝试,将 data 的schema定义需要改成
> >>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
> >>> STRING)))
> >>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
> >>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
> >>> org.apache.flink.table.api.ValidationException: Type
> >>> ARRAY> of table
> field
> >>> 'data' does not match with the physical type ROW<`f0`
> ROW<`tracking_number`
> >>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource
> return
> >>> type.
> >>>
> >>>
> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
> >>> [image: image.png]
> >>>
> >>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
> >>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
> >>>> STRING)))
> >>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断
> >>>> json
> >>>>

Re: Flink 1.10 JSON 解析

2020-03-20 Thread
hi、
好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug
[image: image.png]

On Fri, Mar 20, 2020 at 2:17 PM 宇张  wrote:

> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成
> ARRAY(ROW(...))
> 另外删除
> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
> [image: image.png]
>
>
> On Fri, Mar 20, 2020 at 12:08 PM 宇张  wrote:
>
>> hi,
>> 好的,我这面进行了尝试,将 data 的schema定义需要改成
>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type
>> ARRAY> of table field
>> 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
>> type.
>>
>> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
>> [image: image.png]
>>
>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:
>>
>>> Hi,
>>>
>>> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
>>> STRING)))
>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
>>> schema 了。
>>>
>>> Best,
>>> Jark
>>>
>>> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>>>
>>> > hi:
>>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
>>> > [image: image.png]
>>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>>> >
>>> >
>>> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
>>> >
>>> >
>>> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
>>> > connect:
>>> >
>>> > streamTableEnv
>>> > .connect(
>>> > new Kafka()
>>> > .version("0.11")
>>> > .topic("mysql_binlog_test_str")
>>> > .startFromEarliest()
>>> > .property("zookeeper.connect",
>>> "localhost:2181")
>>> > .property("bootstrap.servers",
>>> "localhost:9092")
>>> > )
>>> > .withFormat(
>>> > new Json()
>>> >
>>>  
>>> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
>>> > )
>>> > .withSchema(
>>> > new Schema()
>>> > .field("business", DataTypes.STRING())
>>> > .field("data",
>>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
>>> > DataTypes.FIELD("tracking_number",
>>>

Re: Flink 1.10 JSON 解析

2020-03-20 Thread
hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成
ARRAY(ROW(...))
另外删除
.jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
[image: image.png]


On Fri, Mar 20, 2020 at 12:08 PM 宇张  wrote:

> hi,
> 好的,我这面进行了尝试,将 data 的schema定义需要改成
> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> ARRAY> of table field
> 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
> type.
>
> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
> [image: image.png]
>
> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:
>
>> Hi,
>>
>> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
>> schema 了。
>>
>> Best,
>> Jark
>>
>> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>>
>> > hi:
>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
>> > [image: image.png]
>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>> >
>> >
>> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
>> >
>> >
>> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
>> > connect:
>> >
>> > streamTableEnv
>> > .connect(
>> > new Kafka()
>> > .version("0.11")
>> > .topic("mysql_binlog_test_str")
>> > .startFromEarliest()
>> > .property("zookeeper.connect", "localhost:2181")
>> > .property("bootstrap.servers", "localhost:9092")
>> > )
>> > .withFormat(
>> > new Json()
>> >
>>  
>> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
>> > )
>> > .withSchema(
>> > new Schema()
>> > .field("business", DataTypes.STRING())
>> > .field("data",
>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
>> > DataTypes.FIELD("tracking_number",
>> DataTypes.STRING()),
>> > DataTypes.FIELD("invoice_no",
>> DataTypes.STRING())
>> > .field("database", DataTypes.STRING())
>> > .field("table", DataTypes.STRING())
>> > .field("ts", DataTypes.DECIMAL(38, 18))
&

Re: Flink 1.10 JSON 解析

2020-03-19 Thread
hi,
好的,我这面进行了尝试,将 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
.jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
ARRAY> of table field
'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
type.
而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
[image: image.png]

On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:

> Hi,
>
> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
> schema 了。
>
> Best,
> Jark
>
> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>
> > hi:
> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> > [image: image.png]
> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
> >
> >
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
> >
> >
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> > connect:
> >
> > streamTableEnv
> > .connect(
> > new Kafka()
> > .version("0.11")
> > .topic("mysql_binlog_test_str")
> > .startFromEarliest()
> > .property("zookeeper.connect", "localhost:2181")
> > .property("bootstrap.servers", "localhost:9092")
> > )
> > .withFormat(
> > new Json()
> >
>  
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> > )
> > .withSchema(
> > new Schema()
> > .field("business", DataTypes.STRING())
> > .field("data",
> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
> > DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> > DataTypes.FIELD("invoice_no",
> DataTypes.STRING())
> > .field("database", DataTypes.STRING())
> > .field("table", DataTypes.STRING())
> > .field("ts", DataTypes.DECIMAL(38, 18))
> > .field("type", DataTypes.STRING())
> > .field("putRowNum", DataTypes.DECIMAL(38, 18))
> > )
> > .createTemporaryTable("Test");
> >
> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
> >
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> > at
> >
> org.apache.flink.formats.json.JsonRo

Flink 1.10 JSON 解析

2020-03-19 Thread
hi:
1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
[image: image.png]
2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
connect:

streamTableEnv
.connect(
new Kafka()
.version("0.11")
.topic("mysql_binlog_test_str")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(
new Json()

.jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
)
.withSchema(
new Schema()
.field("business", DataTypes.STRING())
.field("data",
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
DataTypes.FIELD("tracking_number",
DataTypes.STRING()),
DataTypes.FIELD("invoice_no",
DataTypes.STRING())
.field("database", DataTypes.STRING())
.field("table", DataTypes.STRING())
.field("ts", DataTypes.DECIMAL(38, 18))
.field("type", DataTypes.STRING())
.field("putRowNum", DataTypes.DECIMAL(38, 18))
)
.createTemporaryTable("Test");

异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.

at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more


Re: flink 1.9.1状态持续增大

2019-11-27 Thread
我在用Flink的Blink Table Api,状态设置为:
streamTableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(15),
Time.minutes(20));
,我预期过期状态自动清理,总的状态大小在一个范围内波动,但是现在过期状态并没有清理,导致状态越来越大,最终内存溢出;并且先前对于订阅单topic的流使用子查询sql,最外层使用处理时间时间窗口统计,单过期状态也不清理(这种情况不知道是不是误用导致的)

On Wed, Nov 27, 2019 at 8:18 PM Congxian Qiu  wrote:

> Hi
>
> 你使用 TTL state 吗? 你怎么使用的,预期行为是什么
>
> Best,
> Congxian
>
>
> 谷歌-akulaku  于2019年11月27日周三 下午5:54写道:
>
> > Hello,我这面用FlinkKafkaConsumer011订阅topic list,在设置过期时间后过期状态没有清理,请问有什么解决办法吗,
> > 并且使用双流union过期状态也是不清理,但是单流的和单topic的情况是可以清理的,请问这是bug吗
> >
> >
> >
> > 发送自 Windows 10 版邮件应用
> >
> >
>