Thanks Jiabao and Yaroslav for your quick responses.
Regards,
Kirti Dhar
From: Yaroslav Tkachenko
Sent: 01 February 2024 21:42
Cc: user@flink.apache.org
Subject: Re: RE: Flink Kafka Sink + Schema Registry + Message Headers
The schema registry support is provided
ser wrote:
>> > Hi Jiabao,
>> >
>> > Thanks for reply.
>> >
>> > Currently I am using Flink 1.16.1 and I am not able to find any
>> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
>> > Although on github I found thi
> > Hi Jiabao,
> >
> > Thanks for reply.
> >
> > Currently I am using Flink 1.16.1 and I am not able to find any
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> > Although on github I found this support here:
> https://githu
ng Flink 1.16.1 and I am not able to find any
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> Although on github I found this support here:
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/fl
-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
But this doesn't seem released yet. Can you please point me towards correct
Flink version?
Also, any help on question 1 regarding Schema Registry?
Regards,
Kirti Dhar
-Original Message-
From
Hi Kirti,
Kafka Sink supports sending messages with headers.
You should implement a HeaderProvider to extract headers from input element.
KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder
Hi Mates,
I have below queries regarding Flink Kafka Sink.
1. Does Kafka Sink support schema registry? If yes, is there any
documentations to configure the same?
2. Does Kafka Sink support sending messages (ProducerRecord) with headers?
Regards,
Kirti Dhar
?pageId=255071710
On Mon, Oct 2, 2023 at 2:47 PM Lorenzo Nicora
wrote:
> Hi team
>
> In Kafka Sink docs [1], with EXACTLY_ONCE it is recommended to set:
> transaction_timeout > maximum_checkpoint duration +
> maximum_restart_duration.
>
> I understand transaction_timeout >
Hi team
In Kafka Sink docs [1], with EXACTLY_ONCE it is recommended to set:
transaction_timeout > maximum_checkpoint duration + maximum_restart_duration.
I understand transaction_timeout > maximum_checkpoint_duration
But why adding maximum_restart_duration?
If the application recover
torStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
> StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
> checkpointedSize=291} from job man
; Hi nick,
>
> Is there any error log? That may help to analyze the root cause.
>
> On Sun, Jul 23, 2023 at 9:53 PM nick toker
> wrote:
>
>> hello
>>
>>
>> we replaced deprecated kafka producer with kafka sink
>> and from time to time when we submit a job he st
Hi nick,
Is there any error log? That may help to analyze the root cause.
On Sun, Jul 23, 2023 at 9:53 PM nick toker wrote:
> hello
>
>
> we replaced deprecated kafka producer with kafka sink
> and from time to time when we submit a job he stack for 5 min in
> inisazaing (
hello
we replaced deprecated kafka producer with kafka sink
and from time to time when we submit a job he stack for 5 min in inisazaing
( on sink operators)
we verify the the transaction prefix is unique
it's not happened when we use kafka producer
What can be the reason?
<18765295...@163.com> 写道:
>
>hi,我在使用1.16.0版本时遇到kafka sink 时间戳异常大的情况,以下分别为正常和异常数据
>正常:
>{
> "partition": 0,
> "offset": 16,
> "msg": "x",
> "timespan": 1677487065330,
> "date&
>>> Kafka Source will emit KafkaConsumer metrics
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
>>> .
>>>
>>> It looks like Kafka Sink
>>> <https://nightlies.a
t;
> On Mon, Feb 6, 2023 at 11:49 AM Andrew Otto wrote:
>
>> Hi!
>>
>> Kafka Source will emit KafkaConsumer metrics
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
>> .
>>
>> It
AM Andrew Otto wrote:
> Hi!
>
> Kafka Source will emit KafkaConsumer metrics
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
> .
>
> It looks like Kafka Sink
> <https://nightlies.apache.org/flink/
Hi!
Kafka Source will emit KafkaConsumer metrics
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
.
It looks like Kafka Sink
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#monitorin
When enabling exactly_once on my kafka sink, I am seeing extremely long
initialization times (over 1 hour), especially after restoring from a
savepoint. In the logs I see the job constantly initializing thousands of
kafka producers like this:
2023-01-31 14:39:58,150 INFO
for the kafka sink to automatically adapt
setTransactionalIdPrefix("XYZ") // just in case transactions are required
make the kafka sink automatically adapt to the checkpointing.mode (that is,
use the same guarantee) or on the contrary I should explicitly set both
guara
://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66
[2]:
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink
感谢回复,我这边问题已经修复了,修改一下clients的版本到2.4.1就可以了
在 2022-08-26 16:20:27,"Weihua Hu" 写道:
>可以尝试升级到 2.5+
>
>Best,
>Weihua
>
>
>On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg wrote:
>
>> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢
>> 在 2022-08-25 18:31:06,"Weihua Hu" 写道:
>> >kafka
可以尝试升级到 2.5+
Best,
Weihua
On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg wrote:
> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢
> 在 2022-08-25 18:31:06,"Weihua Hu" 写道:
> >kafka 集群的版本是什么呢?看起来是集群版本有点低了
> >
> >Best,
> >Weihua
> >
> >
> >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote:
> >
> >> 大佬们好:
> >>
您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢
在 2022-08-25 18:31:06,"Weihua Hu" 写道:
>kafka 集群的版本是什么呢?看起来是集群版本有点低了
>
>Best,
>Weihua
>
>
>On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote:
>
>> 大佬们好:
>>
>> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
>>
kafka 集群的版本是什么呢?看起来是集群版本有点低了
Best,
Weihua
On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote:
> 大佬们好:
>
> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
>
> 异常如下:
>
> 2022-08-25 10:42:44
>
>
大佬们好:
我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
异常如下:
2022-08-25 10:42:44
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write
a non-default producerId at version 0
相关代码如下:
Properties
8...@163.com> wrote:
>
>> Hi dear engineer,
>>
>> Flink sql supports kafka sink table, not sure whether it supports kafka
>> key in kafka sink table? As I want to specify kafka key when inserting
>> data into kafka sink table.
>> Thanks for your answer in advance.
>>
>>
>>
>> Thanks && Regards,
>> Hunk
>>
>>
>>
>>
>
8...@163.com> wrote:
>
>> Hi dear engineer,
>>
>> Flink sql supports kafka sink table, not sure whether it supports kafka
>> key in kafka sink table? As I want to specify kafka key when inserting
>> data into kafka sink table.
>> Thanks for your answer in advance.
>>
>>
>>
>> Thanks && Regards,
>> Hunk
>>
>>
>>
>>
>
// TODO Auto-generated catch block
>
>
>
> } catch (Exception e) {
>
>
>
> }
>
>
>
> return keyOut.toByteArray();
>
> }
>
>
>
>
>
>
>
> *From:* Ghiya, Jay (GE Healthcare)
> *Sent:* 18 May 2022 21:51
>
2022 21:51
To: user@flink.apache.org
Cc: d...@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare)
; Kumar, Vipin (GE Healthcare)
Subject: Kafka Sink Key and Value Avro Schema Usage Issues
Hi Team,
This is regarding Flink Kafka Sink. We have a usecase where we have headers and
both key
Hi Team,
This is regarding Flink Kafka Sink. We have a usecase where we have headers and
both key and value as Avro Schema.
Below is the expectation in terms of intuitiveness for avro kafka key and value:
KafkaSink.>builder()
.setBootstrapServers(cloudkafkaBroker
"top" level columns of your
sink table (i.e., fields inside Row are not supported, at least in PyFlink).
Thanks!
On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote:
> Hi dear engineer,
>
> Flink sql supports kafka sink table, not sure whether it supports kafka
&g
"top" level columns of your
sink table (i.e., fields inside Row are not supported, at least in PyFlink).
Thanks!
On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote:
> Hi dear engineer,
>
> Flink sql supports kafka sink table, not sure whether it supports kafka
&g
Hi dear engineer,
Flink sql supports kafka sink table, not sure whether it supports kafka key in
kafka sink table? As I want to specify kafka key when inserting data into kafka
sink table.
Thanks for your answer in advance.
Thanks && Regards,
Hunk
Hi dear engineer,
Flink sql supports kafka sink table, not sure whether it supports kafka key in
kafka sink table? As I want to specify kafka key when inserting data into kafka
sink table.
Thanks for your answer in advance.
Thanks && Regards,
Hunk
Hi Harshit,
I should have already replied to you in an earlier thread[1] for the same
question. It seems that you have missed that. Please double check if that
reply is helpful for you.
Regards,
Dian
[1] https://lists.apache.org/thread/cm6r569spq67249dxw57q8lxh0mk3f7y
On Wed, Apr 27, 2022 at
Dear Team,
I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink github.
I am getting following error .
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that
Hi Harshit,
Could you try to update the following line `ds = ds.map(lambda x:
','.join([str(value) for value in x]))` as following:
`ds = ds.map(lambda x: ','.join([str(value) for value in x]),
output_type=Types.STRING())`
The reason is that if the output type is not specified, it will be
Dear Team,
I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink getting started pages.
I am getting following error .
py4j.protocol.Py4JJavaError: An error occurred while calling
Hello Matthias and others
I am trying to configure a Kafka Sink with SSL properties as shown further
below.
But in the logs I see warnings:
2022-03-21 12:30:17,108 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'group.id' was supplied but isn't a known
ing? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG wrote:
> Hi all,
> I am probably not the smartest but I cannot find how to set ssl-properties
> for a Kafka Sink.
> My assumption
u, Mar 17, 2022 at 4:00 PM HG wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource source = KafkaSource.builder()
&g
Could you share more details on what's not working? Is the
ssl.trustore.location accessible from the Flink nodes?
Matthias
On Thu, Mar 17, 2022 at 4:00 PM HG wrote:
> Hi all,
> I am probably not the smartest but I cannot find how to set ssl-properties
> for a Kafka Sink.
> M
Hi all,
I am probably not the smartest but I cannot find how to set ssl-properties
for a Kafka Sink.
My assumption was that it would be just like the Kafka Consumer
KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore
Hi!
All properties you set by calling KafkaSource.builder().setProperty() will
also be given to KafkaConsumer (see [1]). However these two properties are
specific to Flink and Kafka does not know them, so Kafka will produce a
warning message. These messages are harmless as long as the properties
Hi Team,
I am trying to set the following properties in Kafka Source API in flink
1.14.3 version.
-> client.id.prefix
-> partition.discovery.interval.ms
But I am getting the below mentioned warning in taskmanager logs:
1. WARN org.apache.kafka.clients.consumer.ConsumerConfig [] -
Dear 开发者:
目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql 和任务sql 如下
flink 版本 1.12
源表: 使用canal-json 接入
create table rt_ods.ods_za_log_member_base_info(
MemberId bigint COMMENT '用户ID',
NickName string COMMENT '用户昵称',
proctime as PROCTIME
kafka 数据格式:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
06:39:05.088"}
hi,没有效果 具体是啥?
cxx <1156531...@qq.com> 于2021年1月7日周四 上午9:53写道:
> 我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。
> 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。
> 我设置了事务id,隔离级别,client
> id,enable.idempotence,max.in.flight.requests.per.connection,retries
> 但是没有效果。
>
>
>
> --
>
我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。
例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。
我设置了事务id,隔离级别,client
id,enable.idempotence,max.in.flight.requests.per.connection,retries
但是没有效果。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。
在 2021-01-06 14:01:34,"Evan" 写道:
>flinksql 貌似是目前做不到你说的这样
>
>
>
>
>发件人: air23
>发送时间: 2021-01-06 12:29
>收件人: user-zh
>主题: flink sql消费kafka sink到mysql问题
>你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
flinksql 貌似是目前做不到你说的这样
发件人: air23
发送时间: 2021-01-06 12:29
收件人: user-zh
主题: flink sql消费kafka sink到mysql问题
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
CREATE TABLE source1 (
id BIGINT
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
CREATE TABLE source1 (
id BIGINT ,
username STRING ,
password STRING ,
AddTime TIMESTAMP ,
origin_table STRING METADATA FROM
Dear Leonard Xu:
我会去关注这个issue,非常感谢答疑。
原始邮件
发件人: Leonard Xu
收件人: user-zh
发送时间: 2020年8月12日(周三) 16:05
主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update
changes
Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka
connetor 目前的实现是
020年8月12日,15:58,xiao cai 写道:
>
> Hi Jark:
> 版本:1.11.0
> 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
> AppendStreamTableSink doesn't support consuming update changes which is
> produced by node GroupAggregate
>
>
> 我希望能够在sql校验时也能使upsert操作作用于kafka si
是不是update-mode 改用 update模式
--原始邮件--
发件人:
"user-zh"
Hi Jark:
版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
AppendStreamTableSink doesn't support consuming update changes which is
produced by node GroupAggregate
我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka
附上执行sql:
create table kafka_table_1
te:
> >> >
> >> > Hi Gary,
> >> > Thanks for the info. I am aware this feature is available in 1.9.0
> onwards. Our cluster is still very old and have CICD challenges,I was
> hoping not to bloat up the application jar by packaging even flink-core
> with it.
e in 1.9.0
onwards. Our cluster is still very old and have CICD challenges,I was
hoping not to bloat up the application jar by packaging even flink-core
with it. If its not possible to do this with older version without writing
our own kafka sink implementation similar to the flink provided version
m aware this feature is available in 1.9.0
> onwards. Our cluster is still very old and have CICD challenges,I was
> hoping not to bloat up the application jar by packaging even flink-core
> with it. If its not possible to do this with older version without writing
> our own kafka sink i
gt; possible to do this with older version without writing our own kafka sink
> implementation similar to the flink provided version in 1.9.0 then I think we
> will pack flink-core 1.9.0 with the application and follow the approach that
> you suggested. Thanks again for getting back
own kafka
sink implementation similar to the flink provided version in 1.9.0 then I
think we will pack flink-core 1.9.0 with the application and follow the
approach that you suggested. Thanks again for getting back to me so
quickly.
Best,
Nick
On Tue, May 12, 2020 at 3:37 AM Gary Yao wrote:
>
/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
On Mon, May 11, 2020 at 10:59 PM Nick Bendtner wrote:
>
> Hi guys,
> I use 1.8.0 version for flink-connector-kafka. Do you have any
> recommendations on how to produce a ProducerRecord from a kafka sink. Looking
>
Hi guys,
I use 1.8.0 version for flink-connector-kafka. Do you have any
recommendations on how to produce a ProducerRecord from a kafka
sink. Looking to add support to kafka headers therefore thinking about
ProducerRecord. If you have any thoughts its highly appreciated.
Best,
Nick.
Hi,大家好:
我在使用 flink kafka sink 时遇到几个问题/疑惑,请教大家。
1. kafka sink 没有像 elasticsearch sink 一样提供一个
ActionRequestFailureHandler,在遇到异常怎么办呢? 而且不确定到底会有哪些异常?
在 FlinkKafkaProducer 的 open中的回调是这样的,onCompletion 只有 RecordMetadata 和 Exception
,不能拿到 Record,而且 callback 是private的,无法通过继承重写
if (logFailuresOnly
and
flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment.
Thanks,
Lei
wangl...@geekplus.com.cn
From: wangl...@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark,
I have tried to use CREATE
te:* 2020-03-11 11:13
> *To:* wangl...@geekplus.com.cn
> *CC:* Arvid Heise ; user
> *Subject:* Re: Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> CREATE TABLE DDL [1][2] is the recommended way to register a table since
> 1.9. And the yaml way might be depr
the result correctly
Thanks,
Lei
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,
CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9.
And the yaml way might
Wu
> *Date:* 2020-03-09 19:25
> *To:* wangl...@geekplus.com.cn
> *CC:* user
> *Subject:* Re: Kafka sink only support append mode?
> Hi Lei,
>
> Yes. Currently, Kafka sink only supports append mode. Other update mode
> (e.g. upsert mode / retract mode) is on the agenda.
> For now, you
t;schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
> STRING, status INT)"
>
> under format label.
>
> *From:* Arvid Heise
> *Date:* 2020-03-10 20:51
> *To:* wangl...@geekplus.com.cn
> *CC:* user
> *Subject:* Re: Dose flink-1.10 sql-clien
nt support kafka sink?
Hi Lei,
yes Kafka as a sink is supported albeit only for appends (no deletions/updates
yet) [1].
An example is a bit hidden in the documentation [2]:
tables:
- name: MyTableSink
type: sink-table
update-mode: append
connector:
property-version: 1
t
Hi Lei,
yes Kafka as a sink is supported albeit only for appends (no
deletions/updates yet) [1].
An example is a bit hidden in the documentation [2]:
tables:
- name: MyTableSink
type: sink-table
update-mode: append
connector:
property-version: 1
type: kafka
is not appendable. It confused me.
Thanks,
Lei
From: Jark Wu
Date: 2020-03-09 19:25
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Kafka sink only support append mode?
Hi Lei,
Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g.
upsert mode / retract mode) is on the agenda
I have configured source table successfully using the following configuration:
- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
type: STRING
- name: input_date
type: BIGINT
- name: owner_code
type: STRING
connector:
Hi Lei,
Yes. Currently, Kafka sink only supports append mode. Other update mode
(e.g. upsert mode / retract mode) is on the agenda.
For now, you can customize a KafkaTableSink with implementing
UpsertStreamTableSink interface, where you will get a Tuple2
records,
and the Boolean represents insert
I wrote a simple program reading from kafka using sql and sink to kafka.
But only 'update-mode' = 'append' is supported for sink table and the query
sql must have no group statement.
Only append mode is supported for kafka sink?
Thanks,
Lei
Hi Vishwas,
You should pay attention to the other args.
The constructor provided by you has a `KeyedSerializationSchema` arg, while
the comments of the constructor which made you confused only has a
`SerializationSchema` arg. That's their difference.
Best,
Vino
Vishwas Siravara 于2019年11月6日周三
Hi all,
I am using flink 1.7.0 and using this constructor
FlinkKafkaProducer(String topicId, KeyedSerializationSchema
serializationSchema, Properties producerConfig)
>From the doc it says this constructor uses fixed partitioner. I want to
partition based on key , so I tried to use this
public
Hi,
This is the contract of 2PC transactions.
Multiple commit retries should result in only one commit which actually happens
in the external system.
The external system has to support deduplication of committed transactions,
e.g. by some unique id.
Best,
Andrey
> On 10 Oct 2019, at 07:15,
After reading about FlinkKafkaProducer011 and 2PC function in FLINK, I know,
when snapshotState(),
preCommit currentTransaction.
add to the State.
when Checkpoint done and notifyCheckpointComplete(),
producer will commit currentTransaction to brokers.
when initializeState(),
restore from State.
Hi
We’ve got a job producing to a Kafka sink. The
Kafka topics have a retention of 2 weeks. When doing a
complete replay, it seems like Flink isn’t able to
back-pressure or throttle the amount of messages
Hi Chong,
to my knowledge, neither Flink's built-in metrics nor the metrics of the
Kafka producer itself give you this number directly. If your sink is
chained (no serialization, no network) to another Flink operator, you could
take the numRecordsOut of this operator instead. It will tell you how
hi
i have a flink sql, reading record from kafka, then use table function do some
transformation, then produce to kafka.
i have found that in the flink web record received of the first subTask is
always 0 ,and the Records send of the last subTask is 0 as well.
i want to count how many
. Feel free to send these logs to me directly, if
you don't want to share them on the list.
Best,
Konstantin
On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding wrote:
> Hi
>
> We’ve got a job producing to a Kafka sink. The Kafka topics have a
> retention of 2 weeks. When doing a com
Hi
We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of
2 weeks. When doing a complete replay, it seems like Flink isn’t able to
back-pressure or throttle the amount of messages going to Kafka, causing the
following error
>> > >>
> >> >
> >>
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$1.class
> >> > >>
> >> > >>
> >> >
> >>
> org/apa
>
>> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$ChecksumFactory.class
>> > >>
>> > >>
>> >
>> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$Java9ChecksumFactory.class
&g
class
> > >>
> >
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C.class
> > >>
> > >>
> >
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/PureJavaCrc32C.class
> > >>
> > >>
> > >>
> > &
aCrc32C.class
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-02-22 18:03:18,"Zhenghua Gao" 写道:
> >> >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
> >> >
> >> >cd /tmp/blink/opt/co
应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
>> >
>> >cd /tmp/blink/opt/connectors/kafka011
>> >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
>> >
>> >On Fri, Feb 22, 2019 at 2:56 PM 张洪涛 wrote:
>> >
>> >>
> 在 2019-02-22 18:03:18,"Zhenghua Gao" 写道:
> >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
> >
> >cd /tmp/blink/opt/connectors/kafka011
> >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C
> >
> >On Fri, Feb 22, 2019 at 2
>
>> 大家好!
>>
>>
>> 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
>>
>>
>> 环境配置
>> blink standalone 模式
>>
>>
>>
>>
>> 1. 配置environment 启动sql client
>>
>>
>> 2. 创建kafka sink table
&g
大家好!
我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
环境配置
blink standalone 模式
1. 配置environment 启动sql client
2. 创建kafka sink table
CREATETABLEkafka_sink(
messageKeyVARBINARY,
messageValueVARBINARY,
PRIMARYKEY(messageKey))
with(
type='KAFKA011',
topic='sink
fka/clients/producer/KafkaProducer.html
Cheers,
Till
On Wed, Jan 2, 2019 at 5:02 AM Kaibo Zhou wrote:
> Hi,
> I encountered an error while running the kafka sink demo in IDEA.
>
> This is the complete code:
>
> import java.util.
Hi,
I encountered an error while running the kafka sink demo in IDEA.
This is the complete code:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import
: user@flink.apache.org
> Subject: Re: Dulicated messages in kafka sink topic using flink
> cancel-with-savepoint operation
>
> Hi Nastaran,
>
> When you are checking for duplicated messages, are you reading from kafka
> using `read_commited` mode (this is not the d
messages have not been read so everything is OK.
Kind regards,
Nastaran Motavalli
From: Piotr Nowojski
Sent: Thursday, November 29, 2018 3:38:38 PM
To: Nastaran Motavali
Cc: user@flink.apache.org
Subject: Re: Dulicated messages in kafka sink topic using flink
Hi Nastaran,
When you are checking for duplicated messages, are you reading from kafka using
`read_commited` mode (this is not the default value)?
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme
> Semantic.EXACTLY_ONCE: uses
Hi,
I have a flink streaming job implemented via java which reads some messages
from a kafka topic, transforms them and finally sends them to another kafka
topic.
The version of flink is 1.6.2 and the kafka version is 011. I pass the
Semantic.EXACTLY_ONCE parameter to the producer. The problem
I have a Flink job that writes data into Kafka. The Kafka topic has maximum
message size set to 5 MB, so if I try to write any record larger than 5 MB,
it throws the following exception and brings the job down.
1 - 100 of 109 matches
Mail list logo