Hi.

According to what you have said, is it possible to do the key-value cache
at source connector level. Because I checked some of the documentation but
even if i am passing cache.max.bytes.buffering in my worker properties, it
sort of gives me the same problem. It's more of an integration with
Kstreams. So in order to get rid of this issue could you please impart more
light on it , as I am currently trying to get my source connector up and
running with Debezium fetching the data from Postgres DB. Some more
insights could really help.

Thanks & Regards
Divya Jain

On Sat, Jul 2, 2022 at 1:47 AM Malcolm McFarland <mmcfarl...@cavulus.com>
wrote:

> Hi Divya,
>
> Something that we've found useful in the past is to have a secondary
> key-value cache that can store large amounts of data under a key, then pass
> the key through Kafka for the consumer to use for retrieval on the other
> end. Usually leads to much better performance from Kafka's perspective.
>
> Cheers,
> Malcolm McFarland
> Cavulus
>
>
> On Fri, Jul 1, 2022 at 1:13 PM Divya Jain <truedeeds.divya1...@gmail.com>
> wrote:
>
> > Hi,
> >
> > It's the max that could be defined. It cannot go beyond it. I am not sure
> > how to solve this.
> >
> > Thanks
> > Divya Jain
> >
> >
> > On Sat, 2 Jul, 2022, 1:40 am M. Manna, <manme...@gmail.com> wrote:
> >
> > > Hi, you need to increase record and message size because your real
> > message
> > > payload is bigger than what’s mention in properties file.
> > >
> > > Regards,
> > >
> > > On Fri, 1 Jul 2022 at 20:24, Divya Jain <truedeeds.divya1...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am facing this issue:
> > > > 2022-07-01 19:01:05,548] INFO Topic 'postgres.public.content_history'
> > > > already exists.
> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
> > > > [2022-07-01 19:01:05,641] INFO
> > > > WorkerSourceTask{id=smtip-de-content2-source-connector-0} Committing
> > > > offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
> > > > [2022-07-01 19:01:05,642] INFO
> > > > WorkerSourceTask{id=smtip-de-content2-source-connector-0} flushing
> 155
> > > > outstanding messages for offset commit
> > > > (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
> > > > [2022-07-01 19:01:06,034] INFO
> > > > WorkerSourceTask{id=smtip-de-content2-source-connector-0} Finished
> > > > commitOffsets successfully in 393 ms
> > > > (org.apache.kafka.connect.runtime.WorkerSourceTask:586)
> > > > [2022-07-01 19:01:06,575] WARN [Producer
> > > > clientId=connector-producer-smtip-de-content2-source-connector-0] Got
> > > error
> > > > produce response in correlation id 610 on topic-partition
> > > > postgres.public.content_history-1, splitting and retrying (2147483647
> > > > attempts left). Error: MESSAGE_TOO_LARGE
> > > > (org.apache.kafka.clients.producer.internals.Sender:582)
> > > > [2022-07-01 19:01:06,843] ERROR
> > > > WorkerSourceTask{id=smtip-de-content2-source-connector-0} failed to
> > send
> > > > record to postgres.public.content_history:
> > > >  (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
> > > > org.apache.kafka.common.errors.RecordTooLargeException: The request
> > > > included a message larger than the max message size the server will
> > > accept.
> > > > [2022-07-01 19:01:06,927] WARN [Producer
> > > > clientId=connector-producer-smtip-de-content2-source-connector-0] Got
> > > error
> > > > produce response in correlation id 643 on topic-partition
> > > > postgres.public.content_history-0, splitting and retrying (2147483647
> > > > attempts left). Error: MESSAGE_TOO_LARGE
> > > > (org.apache.kafka.clients.producer.internals.Sender:582)
> > > > [2022-07-01 19:01:06,941] INFO The task will send records to topic
> > > > 'postgres.public.content_attributes' for the first time. Checking
> > whether
> > > > topic exists (org.apache.kafka.connect.runtime.WorkerSourceTask:419)
> > > > [2022-07-01 19:01:06,943] INFO Topic
> > 'postgres.public.content_attributes'
> > > > already exists.
> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
> > > >
> > > >
> > > > The properties I am using in my worker.properties are: I have tried
> > > > combination of properties I am not sure how to fix it. Please guide
> on
> > > the
> > > > same.
> > > >
> > > > offset.flush.timeout.ms=60000
> > > >
> > > > offset.flush.interval.ms=5000
> > > >
> > > > max.partition.fetch.bytes=2147483647
> > > >
> > > > max.request.size=2147483647
> > > >
> > > > max.message.bytes=2147483647
> > > >
> > > > message.max.bytes=2147483647
> > > >
> > > > replica.fetch.max.bytes=2147483647
> > > >
> > > > producer.security.protocol=SSL
> > > >
> > > > producer.max.request.size=2147483647
> > > >
> > > > connector.client.config.override.policy=All
> > > >
> > > > Thanks & Regards
> > > > Divya Jain
> > > >
> > >
> >
>

Reply via email to