This seems related more on kafka connect configuration than the connector 
itself. I guess you'll need to tune the options related to this like:

offset.flush.timeout.ms
offset.flush.interval.ms

--
Andrea Cosentino 
----------------------------------
Apache Camel PMC Chair
Apache Karaf Committer
Apache Servicemix PMC Member
Email: ancosen1...@yahoo.com
Twitter: @oscerd2
Github: oscerd






On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende 
<arundhati.bhe...@prudential.com> wrote: 





aws-s3 connector  -  not aws2-s3.  

On 1/4/21, 5:19 PM, "Andrea Cosentino" <anco...@gmail.com> wrote:

    Is this with aws2-s3 connector or aws2-s3?

    Il lun 4 gen 2021, 23:05 Arundhati Bhende <arundhati.bhe...@prudential.com>
    ha scritto:

    > Hi, I am testing the connector with different cleanup policies for the
    > Topic.
    >
    > If the topic cleanup.policy is set to "delete",  the connector works
    > correctly and I am able to access the message in the topic
    >
    > If the topic cleanup.policy is set to "compact", the connect Task fails
    > with the below error.
    >
    > I am trying to find out why this happens.  Can someone please explain?
    >
    >      trace: org.apache.kafka.connect.errors.ConnectException:
    > OffsetStorageWriter is already flushing
    >          at
    > 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
    >          at
    > 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
    >          at
    > 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    >          at
    > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    >          at
    > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    >          at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >          at
    > 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >          at
    > 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >          at java.lang.Thread.run(Thread.java:748)
    >
    > Thank you
    >
    >
    >

Reply via email to