RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Kirti Dhar Upadhyay K via user
Thanks Jiabao and Yaroslav for your quick responses. Regards, Kirti Dhar From: Yaroslav Tkachenko Sent: 01 February 2024 21:42 Cc: user@flink.apache.org Subject: Re: RE: Flink Kafka Sink + Schema Registry + Message Headers The schema registry support is provided

Re: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Yaroslav Tkachenko
ser wrote: >> > Hi Jiabao, >> > >> > Thanks for reply. >> > >> > Currently I am using Flink 1.16.1 and I am not able to find any >> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. >> > Although on github I found thi

Re: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Yaroslav Tkachenko
> > Hi Jiabao, > > > > Thanks for reply. > > > > Currently I am using Flink 1.16.1 and I am not able to find any > HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. > > Although on github I found this support here: > https://githu

RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Jiabao Sun
ng Flink 1.16.1 and I am not able to find any > HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. > Although on github I found this support here: > https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/fl

RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Kirti Dhar Upadhyay K via user
-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java But this doesn't seem released yet. Can you please point me towards correct Flink version? Also, any help on question 1 regarding Schema Registry? Regards, Kirti Dhar -Original Message- From

RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-01-31 Thread Jiabao Sun
Hi Kirti, Kafka Sink supports sending messages with headers. You should implement a HeaderProvider to extract headers from input element. KafkaSink sink = KafkaSink.builder() .setBootstrapServers(brokers) .setRecordSerializer(KafkaRecordSerializationSchema.builder

Flink Kafka Sink + Schema Registry + Message Headers

2024-01-31 Thread Kirti Dhar Upadhyay K via user
Hi Mates, I have below queries regarding Flink Kafka Sink. 1. Does Kafka Sink support schema registry? If yes, is there any documentations to configure the same? 2. Does Kafka Sink support sending messages (ProducerRecord) with headers? Regards, Kirti Dhar

Re: Kafka Sink and Kafka transaction timeout

2023-10-02 Thread Tzu-Li (Gordon) Tai
?pageId=255071710 On Mon, Oct 2, 2023 at 2:47 PM Lorenzo Nicora wrote: > Hi team > > In Kafka Sink docs [1], with EXACTLY_ONCE it is recommended to set: > transaction_timeout > maximum_checkpoint duration + > maximum_restart_duration. > > I understand transaction_timeout >

Kafka Sink and Kafka transaction timeout

2023-10-02 Thread Lorenzo Nicora
Hi team In Kafka Sink docs [1], with EXACTLY_ONCE it is recommended to set: transaction_timeout > maximum_checkpoint duration + maximum_restart_duration. I understand transaction_timeout > maximum_checkpoint_duration But why adding maximum_restart_duration? If the application recover

Re: kafka sink

2023-07-30 Thread nick toker
torStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= > StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, > checkpointedSize=291} from job man

Re: kafka sink

2023-07-24 Thread nick toker
; Hi nick, > > Is there any error log? That may help to analyze the root cause. > > On Sun, Jul 23, 2023 at 9:53 PM nick toker > wrote: > >> hello >> >> >> we replaced deprecated kafka producer with kafka sink >> and from time to time when we submit a job he st

Re: kafka sink

2023-07-23 Thread Shammon FY
Hi nick, Is there any error log? That may help to analyze the root cause. On Sun, Jul 23, 2023 at 9:53 PM nick toker wrote: > hello > > > we replaced deprecated kafka producer with kafka sink > and from time to time when we submit a job he stack for 5 min in > inisazaing (

kafka sink

2023-07-23 Thread nick toker
hello we replaced deprecated kafka producer with kafka sink and from time to time when we submit a job he stack for 5 min in inisazaing ( on sink operators) we verify the the transaction prefix is unique it's not happened when we use kafka producer What can be the reason?

Re:Flink Kafka Sink时间戳异常

2023-02-27 Thread haishui
<18765295...@163.com> 写道: > >hi,我在使用1.16.0版本时遇到kafka sink 时间戳异常大的情况,以下分别为正常和异常数据 >正常: >{ > "partition": 0, > "offset": 16, > "msg": "x", > "timespan": 1677487065330, > "date&

Re: Kafka Sink Kafka Producer metrics?

2023-02-07 Thread Andrew Otto
>>> Kafka Source will emit KafkaConsumer metrics >>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics> >>> . >>> >>> It looks like Kafka Sink >>> <https://nightlies.a

Re: Kafka Sink Kafka Producer metrics?

2023-02-06 Thread Mason Chen
t; > On Mon, Feb 6, 2023 at 11:49 AM Andrew Otto wrote: > >> Hi! >> >> Kafka Source will emit KafkaConsumer metrics >> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics> >> . >> >> It

Re: Kafka Sink Kafka Producer metrics?

2023-02-06 Thread Mason Chen
AM Andrew Otto wrote: > Hi! > > Kafka Source will emit KafkaConsumer metrics > <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics> > . > > It looks like Kafka Sink > <https://nightlies.apache.org/flink/

Kafka Sink Kafka Producer metrics?

2023-02-06 Thread Andrew Otto
Hi! Kafka Source will emit KafkaConsumer metrics <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics> . It looks like Kafka Sink <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#monitorin

Extremely long startup time for exactly_once kafka sink

2023-01-31 Thread Bobby Richard
When enabling exactly_once on my kafka sink, I am seeing extremely long initialization times (over 1 hour), especially after restoring from a savepoint. In the logs I see the job constantly initializing thousands of kafka producers like this: 2023-01-31 14:39:58,150 INFO

Re: Strange issue with exactly once checkpoints and the kafka sink

2022-11-16 Thread Salva Alcántara
for the kafka sink to automatically adapt setTransactionalIdPrefix("XYZ") // just in case transactions are required make the kafka sink automatically adapt to the checkpointing.mode (that is, use the same guarantee) or on the contrary I should explicitly set both guara

Strange issue with exactly once checkpoints and the kafka sink

2022-11-06 Thread Salva Alcántara
://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66 [2]: https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink

Re:Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-26 Thread gulugulucxg
感谢回复,我这边问题已经修复了,修改一下clients的版本到2.4.1就可以了 在 2022-08-26 16:20:27,"Weihua Hu" 写道: >可以尝试升级到 2.5+ > >Best, >Weihua > > >On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg wrote: > >> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 >> 在 2022-08-25 18:31:06,"Weihua Hu" 写道: >> >kafka

Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-26 Thread Weihua Hu
可以尝试升级到 2.5+ Best, Weihua On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg wrote: > 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 > 在 2022-08-25 18:31:06,"Weihua Hu" 写道: > >kafka 集群的版本是什么呢?看起来是集群版本有点低了 > > > >Best, > >Weihua > > > > > >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: > > > >> 大佬们好: > >>

Re:Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-25 Thread gulugulucxg
您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 在 2022-08-25 18:31:06,"Weihua Hu" 写道: >kafka 集群的版本是什么呢?看起来是集群版本有点低了 > >Best, >Weihua > > >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: > >> 大佬们好: >> >> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, >>

Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-25 Thread Weihua Hu
kafka 集群的版本是什么呢?看起来是集群版本有点低了 Best, Weihua On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: > 大佬们好: > > 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, > > 异常如下: > > 2022-08-25 10:42:44 > >

Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-25 Thread gulugulucxg
大佬们好: 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, 异常如下: 2022-08-25 10:42:44 org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0 相关代码如下: Properties

Re: Does kafka key is supported in kafka sink table

2022-05-19 Thread Shengkai Fang
8...@163.com> wrote: > >> Hi dear engineer, >> >> Flink sql supports kafka sink table, not sure whether it supports kafka >> key in kafka sink table? As I want to specify kafka key when inserting >> data into kafka sink table. >> Thanks for your answer in advance. >> >> >> >> Thanks && Regards, >> Hunk >> >> >> >> >

Re: Does kafka key is supported in kafka sink table

2022-05-19 Thread Shengkai Fang
8...@163.com> wrote: > >> Hi dear engineer, >> >> Flink sql supports kafka sink table, not sure whether it supports kafka >> key in kafka sink table? As I want to specify kafka key when inserting >> data into kafka sink table. >> Thanks for your answer in advance. >> >> >> >> Thanks && Regards, >> Hunk >> >> >> >> >

Re: Kafka Sink Key and Value Avro Schema Usage Issues

2022-05-19 Thread Peter Schrott
// TODO Auto-generated catch block > > > > } catch (Exception e) { > > > > } > > > > return keyOut.toByteArray(); > > } > > > > > > > > *From:* Ghiya, Jay (GE Healthcare) > *Sent:* 18 May 2022 21:51 >

RE: Kafka Sink Key and Value Avro Schema Usage Issues

2022-05-18 Thread Ghiya, Jay (GE Healthcare)
2022 21:51 To: user@flink.apache.org Cc: d...@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) ; Kumar, Vipin (GE Healthcare) Subject: Kafka Sink Key and Value Avro Schema Usage Issues Hi Team, This is regarding Flink Kafka Sink. We have a usecase where we have headers and both key

Kafka Sink Key and Value Avro Schema Usage Issues

2022-05-18 Thread Ghiya, Jay (GE Healthcare)
Hi Team, This is regarding Flink Kafka Sink. We have a usecase where we have headers and both key and value as Avro Schema. Below is the expectation in terms of intuitiveness for avro kafka key and value: KafkaSink.>builder() .setBootstrapServers(cloudkafkaBroker

Re: Does kafka key is supported in kafka sink table

2022-05-17 Thread Dhavan Vaidya
"top" level columns of your sink table (i.e., fields inside Row are not supported, at least in PyFlink). Thanks! On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote: > Hi dear engineer, > > Flink sql supports kafka sink table, not sure whether it supports kafka &g

Re: Does kafka key is supported in kafka sink table

2022-05-17 Thread Dhavan Vaidya
"top" level columns of your sink table (i.e., fields inside Row are not supported, at least in PyFlink). Thanks! On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote: > Hi dear engineer, > > Flink sql supports kafka sink table, not sure whether it supports kafka &g

Does kafka key is supported in kafka sink table

2022-05-16 Thread wang
Hi dear engineer, Flink sql supports kafka sink table, not sure whether it supports kafka key in kafka sink table? As I want to specify kafka key when inserting data into kafka sink table. Thanks for your answer in advance. Thanks && Regards, Hunk

Does kafka key is supported in kafka sink table

2022-05-16 Thread wang
Hi dear engineer, Flink sql supports kafka sink table, not sure whether it supports kafka key in kafka sink table? As I want to specify kafka key when inserting data into kafka sink table. Thanks for your answer in advance. Thanks && Regards, Hunk

Re: Kafka Sink Error in sending Row Type data in Flink-(version 1.14.4)

2022-04-27 Thread Dian Fu
Hi Harshit, I should have already replied to you in an earlier thread[1] for the same question. It seems that you have missed that. Please double check if that reply is helpful for you. Regards, Dian [1] https://lists.apache.org/thread/cm6r569spq67249dxw57q8lxh0mk3f7y On Wed, Apr 27, 2022 at

Kafka Sink Error in sending Row Type data in Flink-(version 1.14.4)

2022-04-27 Thread harshit.varsh...@iktara.ai
Dear Team, I am new to pyflink and request for your support in issue I am facing with Pyflink. I am using Pyflink version 1.14.4 & using reference code from pyflink github. I am getting following error . grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that

Re: KAFKA SINK ERROR IN Flink-(version 1.14.4)

2022-04-23 Thread Dian Fu
Hi Harshit, Could you try to update the following line `ds = ds.map(lambda x: ','.join([str(value) for value in x]))` as following: `ds = ds.map(lambda x: ','.join([str(value) for value in x]), output_type=Types.STRING())` The reason is that if the output type is not specified, it will be

KAFKA SINK ERROR IN Flink-(version 1.14.4)

2022-04-22 Thread harshit.varsh...@iktara.ai
Dear Team, I am new to pyflink and request for your support in issue I am facing with Pyflink. I am using Pyflink version 1.14.4 & using reference code from pyflink getting started pages. I am getting following error . py4j.protocol.Py4JJavaError: An error occurred while calling

Re: how to set kafka sink ssl properties

2022-03-22 Thread HG
Hello Matthias and others I am trying to configure a Kafka Sink with SSL properties as shown further below. But in the logs I see warnings: 2022-03-21 12:30:17,108 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'group.id' was supplied but isn't a known

Re: how to set kafka sink ssl properties

2022-03-18 Thread Qingsheng Ren
ing? Is the > ssl.trustore.location accessible from the Flink nodes? > > Matthias > > On Thu, Mar 17, 2022 at 4:00 PM HG wrote: > Hi all, > I am probably not the smartest but I cannot find how to set ssl-properties > for a Kafka Sink. > My assumption

Re: how to set kafka sink ssl properties

2022-03-17 Thread HG
u, Mar 17, 2022 at 4:00 PM HG wrote: > >> Hi all, >> I am probably not the smartest but I cannot find how to set >> ssl-properties for a Kafka Sink. >> My assumption was that it would be just like the Kafka Consumer >> >> KafkaSource source = KafkaSource.builder() &g

Re: how to set kafka sink ssl properties

2022-03-17 Thread Matthias Pohl
Could you share more details on what's not working? Is the ssl.trustore.location accessible from the Flink nodes? Matthias On Thu, Mar 17, 2022 at 4:00 PM HG wrote: > Hi all, > I am probably not the smartest but I cannot find how to set ssl-properties > for a Kafka Sink. > M

how to set kafka sink ssl properties

2022-03-17 Thread HG
Hi all, I am probably not the smartest but I cannot find how to set ssl-properties for a Kafka Sink. My assumption was that it would be just like the Kafka Consumer KafkaSource source = KafkaSource.builder() .setProperties(kafkaProps) .setProperty("ssl.truststore

Re: Query regarding Kafka Source and Kafka Sink in 1.14.3

2022-01-24 Thread Caizhi Weng
Hi! All properties you set by calling KafkaSource.builder().setProperty() will also be given to KafkaConsumer (see [1]). However these two properties are specific to Flink and Kafka does not know them, so Kafka will produce a warning message. These messages are harmless as long as the properties

Query regarding Kafka Source and Kafka Sink in 1.14.3

2022-01-23 Thread Mahima Agarwal
Hi Team, I am trying to set the following properties in Kafka Source API in flink 1.14.3 version. -> client.id.prefix -> partition.discovery.interval.ms But I am getting the below mentioned warning in taskmanager logs: 1. WARN org.apache.kafka.clients.consumer.ConsumerConfig [] -

从kafka sink 到hbase丢失数据

2021-04-07 Thread 夜思流年梦
Dear 开发者: 目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql 和任务sql 如下 flink 版本 1.12 源表: 使用canal-json 接入 create table rt_ods.ods_za_log_member_base_info( MemberId bigint COMMENT '用户ID', NickName string COMMENT '用户昵称', proctime as PROCTIME

flink sql source kafka sink 到 mysql 遇主健冲突出现append现象

2021-02-19 Thread Yu Wang
kafka 数据格式: {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 06:39:05.088"}

Re: flink-kafka-sink

2021-01-12 Thread r pp
hi,没有效果 具体是啥? cxx <1156531...@qq.com> 于2021年1月7日周四 上午9:53写道: > 我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。 > 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。 > 我设置了事务id,隔离级别,client > id,enable.idempotence,max.in.flight.requests.per.connection,retries > 但是没有效果。 > > > > -- >

flink-kafka-sink

2021-01-06 Thread cxx
我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。 我设置了事务id,隔离级别,client id,enable.idempotence,max.in.flight.requests.per.connection,retries 但是没有效果。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复: flink sql消费kafka sink到mysql问题

2021-01-05 Thread air23
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。 在 2021-01-06 14:01:34,"Evan" 写道: >flinksql 貌似是目前做不到你说的这样 > > > > >发件人: air23 >发送时间: 2021-01-06 12:29 >收件人: user-zh >主题: flink sql消费kafka sink到mysql问题 >你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了

回复: flink sql消费kafka sink到mysql问题

2021-01-05 Thread Evan
flinksql 貌似是目前做不到你说的这样 发件人: air23 发送时间: 2021-01-06 12:29 收件人: user-zh 主题: flink sql消费kafka sink到mysql问题 你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 然后再重启 发现报错的数据 会丢失 采用的scan.startup.mode' = 'group-offsets' 按理说 不是要重新消费 失败的那条数据 开始消费吗? 请问如何配置 可以不丢失数据 CREATE TABLE source1 ( id BIGINT

flink sql消费kafka sink到mysql问题

2021-01-05 Thread air23
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 然后再重启 发现报错的数据 会丢失 采用的scan.startup.mode' = 'group-offsets' 按理说 不是要重新消费 失败的那条数据 开始消费吗? 请问如何配置 可以不丢失数据 CREATE TABLE source1 ( id BIGINT , username STRING , password STRING , AddTime TIMESTAMP , origin_table STRING METADATA FROM

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xiao cai
Dear Leonard Xu: 我会去关注这个issue,非常感谢答疑。 原始邮件 发件人: Leonard Xu 收件人: user-zh 发送时间: 2020年8月12日(周三) 16:05 主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka connetor 目前的实现是

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread Leonard Xu
020年8月12日,15:58,xiao cai 写道: > > Hi Jark: > 版本:1.11.0 > 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: > AppendStreamTableSink doesn't support consuming update changes which is > produced by node GroupAggregate > > > 我希望能够在sql校验时也能使upsert操作作用于kafka si

回复:使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xuzh
是不是update-mode 改用 update模式 --原始邮件-- 发件人: "user-zh"

使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xiao cai
Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka 附上执行sql: create table kafka_table_1

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-14 Thread Nick Bendtner
te: > >> > > >> > Hi Gary, > >> > Thanks for the info. I am aware this feature is available in 1.9.0 > onwards. Our cluster is still very old and have CICD challenges,I was > hoping not to bloat up the application jar by packaging even flink-core > with it.

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-14 Thread Gary Yao
e in 1.9.0 onwards. Our cluster is still very old and have CICD challenges,I was hoping not to bloat up the application jar by packaging even flink-core with it. If its not possible to do this with older version without writing our own kafka sink implementation similar to the flink provided version

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
m aware this feature is available in 1.9.0 > onwards. Our cluster is still very old and have CICD challenges,I was > hoping not to bloat up the application jar by packaging even flink-core > with it. If its not possible to do this with older version without writing > our own kafka sink i

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
gt; possible to do this with older version without writing our own kafka sink > implementation similar to the flink provided version in 1.9.0 then I think we > will pack flink-core 1.9.0 with the application and follow the approach that > you suggested. Thanks again for getting back

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
own kafka sink implementation similar to the flink provided version in 1.9.0 then I think we will pack flink-core 1.9.0 with the application and follow the approach that you suggested. Thanks again for getting back to me so quickly. Best, Nick On Tue, May 12, 2020 at 3:37 AM Gary Yao wrote: >

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
/flink/streaming/connectors/kafka/KafkaSerializationSchema.html On Mon, May 11, 2020 at 10:59 PM Nick Bendtner wrote: > > Hi guys, > I use 1.8.0 version for flink-connector-kafka. Do you have any > recommendations on how to produce a ProducerRecord from a kafka sink. Looking >

ProducerRecord with Kafka Sink for 1.8.0

2020-05-11 Thread Nick Bendtner
Hi guys, I use 1.8.0 version for flink-connector-kafka. Do you have any recommendations on how to produce a ProducerRecord from a kafka sink. Looking to add support to kafka headers therefore thinking about ProducerRecord. If you have any thoughts its highly appreciated. Best, Nick.

请教两个关于 kafka sink 的问题

2020-03-30 Thread whirly
Hi,大家好: 我在使用 flink kafka sink 时遇到几个问题/疑惑,请教大家。 1. kafka sink 没有像 elasticsearch sink 一样提供一个 ActionRequestFailureHandler,在遇到异常怎么办呢? 而且不确定到底会有哪些异常? 在 FlinkKafkaProducer 的 open中的回调是这样的,onCompletion 只有 RecordMetadata 和 Exception ,不能拿到 Record,而且 callback 是private的,无法通过继承重写 if (logFailuresOnly

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread wangl...@geekplus.com.cn
and flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment. Thanks, Lei wangl...@geekplus.com.cn From: wangl...@geekplus.com.cn Date: 2020-03-11 14:51 To: Jark Wu CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Jark, I have tried to use CREATE

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread Jark Wu
te:* 2020-03-11 11:13 > *To:* wangl...@geekplus.com.cn > *CC:* Arvid Heise ; user > *Subject:* Re: Re: Dose flink-1.10 sql-client support kafka sink? > Hi Lei, > > CREATE TABLE DDL [1][2] is the recommended way to register a table since > 1.9. And the yaml way might be depr

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread wangl...@geekplus.com.cn
the result correctly Thanks, Lei From: Jark Wu Date: 2020-03-11 11:13 To: wangl...@geekplus.com.cn CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread Jark Wu
Wu > *Date:* 2020-03-09 19:25 > *To:* wangl...@geekplus.com.cn > *CC:* user > *Subject:* Re: Kafka sink only support append mode? > Hi Lei, > > Yes. Currently, Kafka sink only supports append mode. Other update mode > (e.g. upsert mode / retract mode) is on the agenda. > For now, you

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Jark Wu
t;schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code > STRING, status INT)" > > under format label. > > *From:* Arvid Heise > *Date:* 2020-03-10 20:51 > *To:* wangl...@geekplus.com.cn > *CC:* user > *Subject:* Re: Dose flink-1.10 sql-clien

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
nt support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 t

Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Arvid Heise
Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread wangl...@geekplus.com.cn
is not appendable. It confused me. Thanks, Lei From: Jark Wu Date: 2020-03-09 19:25 To: wangl...@geekplus.com.cn CC: user Subject: Re: Kafka sink only support append mode? Hi Lei, Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda

Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector:

Re: Kafka sink only support append mode?

2020-03-09 Thread Jark Wu
Hi Lei, Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. For now, you can customize a KafkaTableSink with implementing UpsertStreamTableSink interface, where you will get a Tuple2 records, and the Boolean represents insert

Kafka sink only support append mode?

2020-03-09 Thread wangl...@geekplus.com.cn
I wrote a simple program reading from kafka using sql and sink to kafka. But only 'update-mode' = 'append' is supported for sink table and the query sql must have no group statement. Only append mode is supported for kafka sink? Thanks, Lei

Re: Partitioning based on key flink kafka sink

2019-11-06 Thread vino yang
Hi Vishwas, You should pay attention to the other args. The constructor provided by you has a `KeyedSerializationSchema` arg, while the comments of the constructor which made you confused only has a `SerializationSchema` arg. That's their difference. Best, Vino Vishwas Siravara 于2019年11月6日周三

Partitioning based on key flink kafka sink

2019-11-05 Thread Vishwas Siravara
Hi all, I am using flink 1.7.0 and using this constructor FlinkKafkaProducer(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) >From the doc it says this constructor uses fixed partitioner. I want to partition based on key , so I tried to use this public

Re: about Kafka sink and 2PC function

2019-10-18 Thread Andrey Zagrebin
Hi, This is the contract of 2PC transactions. Multiple commit retries should result in only one commit which actually happens in the external system. The external system has to support deduplication of committed transactions, e.g. by some unique id. Best, Andrey > On 10 Oct 2019, at 07:15,

about Kafka sink and 2PC function

2019-10-09 Thread 121476...@qq.com
After reading about FlinkKafkaProducer011 and 2PC function in FLINK, I know, when snapshotState(), preCommit currentTransaction. add to the State. when Checkpoint done and notifyCheckpointComplete(), producer will commit currentTransaction to brokers. when initializeState(), restore from State.

Re: Throttling/effective back-pressure on a Kafka sink

2019-05-30 Thread Derek VerLee
Hi We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages

Re: how to count kafka sink number

2019-05-13 Thread Konstantin Knauf
Hi Chong, to my knowledge, neither Flink's built-in metrics nor the metrics of the Kafka producer itself give you this number directly. If your sink is chained (no serialization, no network) to another Flink operator, you could take the numRecordsOut of this operator instead. It will tell you how

how to count kafka sink number

2019-05-12 Thread jszhouch...@163.com
hi i have a flink sql, reading record from kafka, then use table function do some transformation, then produce to kafka. i have found that in the flink web record received of the first subTask is always 0 ,and the Records send of the last subTask is 0 as well. i want to count how many

Re: Throttling/effective back-pressure on a Kafka sink

2019-03-28 Thread Konstantin Knauf
. Feel free to send these logs to me directly, if you don't want to share them on the list. Best, Konstantin On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding wrote: > Hi > > We’ve got a job producing to a Kafka sink. The Kafka topics have a > retention of 2 weeks. When doing a com

Throttling/effective back-pressure on a Kafka sink

2019-03-28 Thread Marc Rooding
Hi We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages going to Kafka, causing the following error

Re: [Blink]sql client kafka sink 失败

2019-02-26 Thread Zhenghua Gao
>> > >> > >> > > >> > org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$1.class > >> > >> > >> > >> > >> > > >> > org/apa

Re: [Blink]sql client kafka sink 失败

2019-02-25 Thread 张洪涛
> >> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$ChecksumFactory.class >> > >> >> > >> >> > >> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C$Java9ChecksumFactory.class &g

Re: Re: Re: [Blink]sql client kafka sink 失败

2019-02-24 Thread Becket Qin
class > > >> > > > org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C.class > > >> > > >> > > > org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/PureJavaCrc32C.class > > >> > > >> > > >> > > &

Re: Re: Re: [Blink]sql client kafka sink 失败

2019-02-24 Thread Zhenghua Gao
aCrc32C.class > >> > >> > >> > >> > >> > >> > >> 在 2019-02-22 18:03:18,"Zhenghua Gao" 写道: > >> >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink): > >> > > >> >cd /tmp/blink/opt/co

Re:Re: Re: [Blink]sql client kafka sink 失败

2019-02-24 Thread 张洪涛
应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink): >> > >> >cd /tmp/blink/opt/connectors/kafka011 >> >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C >> > >> >On Fri, Feb 22, 2019 at 2:56 PM 张洪涛 wrote: >> > >> >>

Re: Re: [Blink]sql client kafka sink 失败

2019-02-22 Thread Becket Qin
> 在 2019-02-22 18:03:18,"Zhenghua Gao" 写道: > >能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink): > > > >cd /tmp/blink/opt/connectors/kafka011 > >jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C > > > >On Fri, Feb 22, 2019 at 2

Re:Re: [Blink]sql client kafka sink 失败

2019-02-22 Thread 张洪涛
> >> 大家好! >> >> >> 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤 >> >> >> 环境配置 >> blink standalone 模式 >> >> >> >> >> 1. 配置environment 启动sql client >> >> >> 2. 创建kafka sink table &g

[Blink]sql client kafka sink 失败

2019-02-21 Thread 张洪涛
大家好! 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤 环境配置 blink standalone 模式 1. 配置environment 启动sql client 2. 创建kafka sink table CREATETABLEkafka_sink( messageKeyVARBINARY, messageValueVARBINARY, PRIMARYKEY(messageKey)) with( type='KAFKA011', topic='sink

Re: Problem when use kafka sink with EXACTLY_ONCE in IDEA

2019-01-02 Thread Till Rohrmann
fka/clients/producer/KafkaProducer.html Cheers, Till On Wed, Jan 2, 2019 at 5:02 AM Kaibo Zhou wrote: > Hi, > I encountered an error while running the kafka sink demo in IDEA. > > This is the complete code: > > import java.util.

Problem when use kafka sink with EXACTLY_ONCE in IDEA

2019-01-01 Thread Kaibo Zhou
Hi, I encountered an error while running the kafka sink demo in IDEA. This is the complete code: import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import

Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

2018-12-03 Thread Piotr Nowojski
: user@flink.apache.org > Subject: Re: Dulicated messages in kafka sink topic using flink > cancel-with-savepoint operation > > Hi Nastaran, > > When you are checking for duplicated messages, are you reading from kafka > using `read_commited` mode (this is not the d

Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

2018-12-01 Thread Nastaran Motavali
messages have not been read so everything is OK. Kind regards, Nastaran Motavalli From: Piotr Nowojski Sent: Thursday, November 29, 2018 3:38:38 PM To: Nastaran Motavali Cc: user@flink.apache.org Subject: Re: Dulicated messages in kafka sink topic using flink

Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

2018-11-29 Thread Piotr Nowojski
Hi Nastaran, When you are checking for duplicated messages, are you reading from kafka using `read_commited` mode (this is not the default value)? https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme > Semantic.EXACTLY_ONCE: uses

Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

2018-11-28 Thread Nastaran Motavali
Hi, I have a flink streaming job implemented via java which reads some messages from a kafka topic, transforms them and finally sends them to another kafka topic. The version of flink is 1.6.2 and the kafka version is 011. I pass the Semantic.EXACTLY_ONCE parameter to the producer. The problem

How to handle exceptions in Kafka sink?

2018-09-13 Thread HarshithBolar
I have a Flink job that writes data into Kafka. The Kafka topic has maximum message size set to 5 MB, so if I try to write any record larger than 5 MB, it throws the following exception and brings the job down.

  1   2   >