Hi Pierre,
Sorry for the late reply.
Your requirement is that your `Table` has a `field` in `Json` format and its
key has reached 100k, and then you want to use such a `field` as the
input/output of `udf`, right? As to whether there is a limit on the number of
nested key, I am not quite clear.
Hi Leonard,
> From my understanding, your case is not a pure deduplication case but
> want to both keep the previous record and current record, thus the
> deduplication query can not satisfy your requirement.
>
Indeed, that's what I came to realise during our discussion on this email
chain.
Hi Laurent,
With respect to Ververica Platform, we will support Flink 1.12 and add
"upsert-kafka" as a packaged connector in our next minor release which we
target for February.
Cheers,
Konstantin
On Thu, Nov 12, 2020 at 3:43 AM Jark Wu wrote:
> Hi Laurent,
>
> 1. Deduplicate with keeping
Hi, Laurent
>
> I'm not sure that would do what I want though. As far as I understand, the
> deduplication query will always remember any values it has seen. So if I
> have, for a specific primary key, the following values in another field: "a",
> "a", "b", "b", "a", "a", the deduplication
失败原因也不写,怎么个不能保存。。。超时?还是啥。
魏积乾 于2020年11月27日周五 下午7:08写道:
> flink-csv-1.11.2.jar
> flink-dist_2.11-1.11.2.jar
> flink-json-1.11.2.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-table_2.11-1.11.2.jar
> flink-table-blink_2.11-1.11.2.jar
> log4j-1.2-api-2.12.1.jar
> log4j-api-2.12.1.jar
>
如下报错:
19:59:56.128 [Flink Netty Client (8009) Thread 6] WARN
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exce
ption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in
Hi Leonard,
thank you for your answer.
I'm not sure that would do what I want though. As far as I understand, the
deduplication query will always remember any values it has seen. So if I
have, for a specific primary key, the following values in another field:
"a", "a", "b", "b", "a", "a", the
Hi Flavio,
others might have better ideas to solve this but I'll give it a try: Have
you considered extending FileOutputFormat to achieve what you need? That
approach (which is discussed in [1]) sounds like something you could do.
Another pointer I want to give is the DefaultRollingPolicy [2]. It
感谢回复!
刚才找到问题了,从maven官网拷贝过来的 pom依赖, scope被设置成 test了。。。改成compile就好了
cljb...@163.com
发件人: Jark Wu
发送时间: 2020-11-27 19:14
收件人: user-zh
主题: Re: flink sql cdc 写数据到mysql,找不到相关的类
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?
On Fri, 27 Nov 2020 at 19:03,
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?
On Fri, 27 Nov 2020 at 19:03, cljb...@163.com wrote:
> 相关的依赖以及添加,不知道如下问题是如何导致,求解!
> 已添加的依赖有:
> flink-connector-mysql-cdc
> flink-format-changelog-json
> flink-json
>
> 报错信息如下:
>
> java.util.ServiceConfigurationError:
>
flink-csv-1.11.2.jar
flink-dist_2.11-1.11.2.jar
flink-json-1.11.2.jar
flink-shaded-zookeeper-3.4.14.jar
flink-table_2.11-1.11.2.jar
flink-table-blink_2.11-1.11.2.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
相关的依赖以及添加,不知道如下问题是如何导致,求解!
已添加的依赖有:
flink-connector-mysql-cdc
flink-format-changelog-json
flink-json
报错信息如下:
java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory:
Provider com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory
could not be instantiated
at
检查了一下上游,发现在source端把并行度改成1就不乱序了
在 2020-11-27 17:44:23,"bulterman" <15618338...@163.com> 写道:
>Hi All,
>kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况
>用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2
请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下
在 2020-11-27 17:34:39,"魏积乾" 写道:
>我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
>希望对你有帮助
>
>
>
>发自我的iPhone
>
>
>-- 原始邮件 --
>发件人: 王默 发送时间:
Hi Timo,
Okay, then the aggregate function should look like this:
> public static class Agg extends AggregateFunction ArrayList> {
> @Override
> public ArrayList createAccumulator() {
> return new ArrayList<>();
> }
> @Override
> public Integer[] getValue(ArrayList
Hello guys,
I have to write my batch data (Dataset) to a file format. Actually
what I need to do is:
1. split the data if it exceeds some size threshold (by line count or
max MB)
2. compress the output data (possibly without converting to the hadoop
format)
Are there any suggestions
Hi All,
kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况
用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2
大家好,最近有项目要把数据写入hbase,本人采用的是hbase api
中的BufferedMutator.flush的方法,每500条数据flush一下,但是发现这种方法偶尔会有十几行写入失败,这种情况下,这几十行数据就会丢失,请问大家有什么建议么?
该用什么方法实时写入hbase,怎么保证数据不会有丢失的情况呢?谢谢大家~
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢你提供的思路,配置文件已设置了state.checkpoints.dir,我检查一下是否有jar未升级
在 2020-11-27 17:34:39,"魏积乾" 写道:
>我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
>希望对你有帮助
>
>
>
>发自我的iPhone
>
>
>-- 原始邮件 --
Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2
我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
希望对你有帮助
发自我的iPhone
-- 原始邮件 --
发件人: 王默
Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2
The most common cause of such issues is usually class loading.
You probably have added the flink-connector-kafka also to flink-dist/libs.
But the connector is only meant to be bundled with your job jar afaik.
Right now, you have the Kafka classes loaded in the user code classloader
and in the
Hi Xingbo,
thanks for sharing. This is very interesting.
Regards,
Niklas
> On 27. Nov 2020, at 03:05, Xingbo Huang wrote:
>
> Hi Niklas,
>
> Thanks a lot for supporting PyFlink. In fact, your requirement for multiple
> input and multiple output is essentially Table Aggregation Functions[1].
Hi Navneeth,
I didn't quite understand how async io can be used here. It would be great
> if you can share some info on it.
You need to add an async operator in the middle of your pipeline in order
to enrich your input data. [1] and [2] will help you.
Also how are you propagating any changes to
这应该是生成的代码,删除后重新打包
--
Sent from: http://apache-flink.147419.n8.nabble.com/
下面是addSink的代码:
result.addSink(new FlinkKafkaProducer(DataSourceConfig.ResultTopic,new
MyKafkaSerializationSchema(DataSourceConfig.ResultTopic),ConnectToKafka.getKafKaProducerProperties(),FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
3)).setParallelism(1);
KafkaProducer配置信息:
Hi,
first of all we don't support ListTypeInfo in Table API. Therefore, it
is treated as a RAW type. The exception during exception creation is a
bug that should be fixed in future version. But the mismatch is valid:
ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
Can you
谢谢jark!这几天一直在做性能调优!
1、这里针对这个简单场景目前可以在sink表的test_status表的primary key,增加一个join
key。即id和status两个列作为key,这样能使用数据最终一致,算是做了下规避,能一致。复杂点的语句感觉有点难搞,有点不敢用,主要不清楚这个乱序会对其它算子有什么影响,很容易出错,确实应该在flink框架里搞了合适些。这里jark在使用flink
sql cdc方面有啥建议吗?
2、关于性能这块,确实flink的rocksdb默认参数,性能很差!
29 matches
Mail list logo