Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-27 Thread Xingbo Huang
Hi Pierre, Sorry for the late reply. Your requirement is that your `Table` has a `field` in `Json` format and its key has reached 100k, and then you want to use such a `field` as the input/output of `udf`, right? As to whether there is a limit on the number of nested key, I am not quite clear.

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard, > From my understanding, your case is not a pure deduplication case but > want to both keep the previous record and current record, thus the > deduplication query can not satisfy your requirement. > Indeed, that's what I came to realise during our discussion on this email chain.

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Konstantin Knauf
Hi Laurent, With respect to Ververica Platform, we will support Flink 1.12 and add "upsert-kafka" as a packaged connector in our next minor release which we target for February. Cheers, Konstantin On Thu, Nov 12, 2020 at 3:43 AM Jark Wu wrote: > Hi Laurent, > > 1. Deduplicate with keeping

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Leonard Xu
Hi, Laurent > > I'm not sure that would do what I want though. As far as I understand, the > deduplication query will always remember any values it has seen. So if I > have, for a specific primary key, the following values in another field: "a", > "a", "b", "b", "a", "a", the deduplication

Re: Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 赵一旦
失败原因也不写,怎么个不能保存。。。超时?还是啥。 魏积乾 于2020年11月27日周五 下午7:08写道: > flink-csv-1.11.2.jar > flink-dist_2.11-1.11.2.jar > flink-json-1.11.2.jar > flink-shaded-zookeeper-3.4.14.jar > flink-table_2.11-1.11.2.jar > flink-table-blink_2.11-1.11.2.jar > log4j-1.2-api-2.12.1.jar > log4j-api-2.12.1.jar >

flink任务运行不久后报netty错误

2020-11-27 Thread 赵一旦
如下报错: 19:59:56.128 [Flink Netty Client (8009) Thread 6] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exce ption in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard, thank you for your answer. I'm not sure that would do what I want though. As far as I understand, the deduplication query will always remember any values it has seen. So if I have, for a specific primary key, the following values in another field: "a", "a", "b", "b", "a", "a", the

Re: Batch compressed file output

2020-11-27 Thread Matthias Pohl
Hi Flavio, others might have better ideas to solve this but I'll give it a try: Have you considered extending FileOutputFormat to achieve what you need? That approach (which is discussed in [1]) sounds like something you could do. Another pointer I want to give is the DefaultRollingPolicy [2]. It

Re: Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
感谢回复! 刚才找到问题了,从maven官网拷贝过来的 pom依赖, scope被设置成 test了。。。改成compile就好了 cljb...@163.com 发件人: Jark Wu 发送时间: 2020-11-27 19:14 收件人: user-zh 主题: Re: flink sql cdc 写数据到mysql,找不到相关的类 估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ? On Fri, 27 Nov 2020 at 19:03,

Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread Jark Wu
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ? On Fri, 27 Nov 2020 at 19:03, cljb...@163.com wrote: > 相关的依赖以及添加,不知道如下问题是如何导致,求解! > 已添加的依赖有: > flink-connector-mysql-cdc > flink-format-changelog-json > flink-json > > 报错信息如下: > > java.util.ServiceConfigurationError: >

回复:Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 魏积乾
flink-csv-1.11.2.jar flink-dist_2.11-1.11.2.jar flink-json-1.11.2.jar flink-shaded-zookeeper-3.4.14.jar flink-table_2.11-1.11.2.jar flink-table-blink_2.11-1.11.2.jar log4j-1.2-api-2.12.1.jar log4j-api-2.12.1.jar log4j-core-2.12.1.jar log4j-slf4j-impl-2.12.1.jar

flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
相关的依赖以及添加,不知道如下问题是如何导致,求解! 已添加的依赖有: flink-connector-mysql-cdc flink-format-changelog-json flink-json 报错信息如下: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not be instantiated at

Re:FLINK SQL 消费kafka消息乱序问题

2020-11-27 Thread bulterman
检查了一下上游,发现在source端把并行度改成1就不乱序了 在 2020-11-27 17:44:23,"bulterman" <15618338...@163.com> 写道: >Hi All, >kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况 >用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2

Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默
请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下 在 2020-11-27 17:34:39,"魏积乾" 写道: >我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 >希望对你有帮助 > > > >发自我的iPhone > > >-- 原始邮件 -- >发件人: 王默 发送时间:

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Dongwon Kim
Hi Timo, Okay, then the aggregate function should look like this: > public static class Agg extends AggregateFunction ArrayList> { > @Override > public ArrayList createAccumulator() { > return new ArrayList<>(); > } > @Override > public Integer[] getValue(ArrayList

Batch compressed file output

2020-11-27 Thread Flavio Pompermaier
Hello guys, I have to write my batch data (Dataset) to a file format. Actually what I need to do is: 1. split the data if it exceeds some size threshold (by line count or max MB) 2. compress the output data (possibly without converting to the hadoop format) Are there any suggestions

FLINK SQL 消费kafka消息乱序问题

2020-11-27 Thread bulterman
Hi All, kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况 用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2

flink实时写入Hbase丢数据问题

2020-11-27 Thread bradyMk
大家好,最近有项目要把数据写入hbase,本人采用的是hbase api 中的BufferedMutator.flush的方法,每500条数据flush一下,但是发现这种方法偶尔会有十几行写入失败,这种情况下,这几十行数据就会丢失,请问大家有什么建议么? 该用什么方法实时写入hbase,怎么保证数据不会有丢失的情况呢?谢谢大家~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默
感谢你提供的思路,配置文件已设置了state.checkpoints.dir,我检查一下是否有jar未升级 在 2020-11-27 17:34:39,"魏积乾" 写道: >我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 >希望对你有帮助 > > > >发自我的iPhone > > >-- 原始邮件 --

带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默
Hi,请教各位一个困扰了几天的问题, 我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败 StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs 且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息 使用的flink版本是1.11.2

回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 魏积乾
我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 希望对你有帮助 发自我的iPhone -- 原始邮件 -- 发件人: 王默

带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默
Hi,请教各位一个困扰了几天的问题, 我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败 StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs 且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息 使用的flink版本是1.11.2

Re: Duplication error on Kafka Connector Libraries

2020-11-27 Thread Arvid Heise
The most common cause of such issues is usually class loading. You probably have added the flink-connector-kafka also to flink-dist/libs. But the connector is only meant to be bundled with your job jar afaik. Right now, you have the Kafka classes loaded in the user code classloader and in the

Re: PyFlink Table API and UDF Limitations

2020-11-27 Thread Niklas Wilcke
Hi Xingbo, thanks for sharing. This is very interesting. Regards, Niklas > On 27. Nov 2020, at 03:05, Xingbo Huang wrote: > > Hi Niklas, > > Thanks a lot for supporting PyFlink. In fact, your requirement for multiple > input and multiple output is essentially Table Aggregation Functions[1].

Re: Caching

2020-11-27 Thread Dongwon Kim
Hi Navneeth, I didn't quite understand how async io can be used here. It would be great > if you can share some info on it. You need to add an async operator in the middle of your pipeline in order to enrich your input data. [1] and [2] will help you. Also how are you propagating any changes to

Re: 执行mvn构建错误

2020-11-27 Thread renjiyun
这应该是生成的代码,删除后重新打包 -- Sent from: http://apache-flink.147419.n8.nabble.com/

FlinkKafkaProducer好像每次事务提交都会重连producer导致打印了大量log

2020-11-27 Thread Wz
下面是addSink的代码: result.addSink(new FlinkKafkaProducer(DataSourceConfig.ResultTopic,new MyKafkaSerializationSchema(DataSourceConfig.ResultTopic),ConnectToKafka.getKafKaProducerProperties(),FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 3)).setParallelism(1); KafkaProducer配置信息:

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Timo Walther
Hi, first of all we don't support ListTypeInfo in Table API. Therefore, it is treated as a RAW type. The exception during exception creation is a bug that should be fixed in future version. But the mismatch is valid: ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. Can you

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-27 Thread jindy_liu
谢谢jark!这几天一直在做性能调优! 1、这里针对这个简单场景目前可以在sink表的test_status表的primary key,增加一个join key。即id和status两个列作为key,这样能使用数据最终一致,算是做了下规避,能一致。复杂点的语句感觉有点难搞,有点不敢用,主要不清楚这个乱序会对其它算子有什么影响,很容易出错,确实应该在flink框架里搞了合适些。这里jark在使用flink sql cdc方面有啥建议吗? 2、关于性能这块,确实flink的rocksdb默认参数,性能很差!