Thanks. I tried with those options with many combinations, but kept getting same error. Asking this to get better understanding.
So, I used the same connector configuration as below. I created the topic with cleanup.policy=compact and kept getting the error below, so I changed "only" the cleanup policy to "delete" and it worked. Other configuration parameters for the topic were kept exactly the same. So, trying to understand the reason for why the topic must be cleanup.policy = delete. DATA=$( cat << EOF { "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "camel.source.maxPollDuration": "10000", "topics": "TEST-S3-SOURCE-DZONE-POC", "camel.source.path.bucketNameOrArn": " push-json-poc", "camel.component.aws-s3.region": "US_EAST_1", "tasks.max": "1", "camel.source.endpoint.useIAMCredentials": "true", "camel.source.endpoint.autocloseBody": "true" } EOF ) Thanks On 1/5/21, 3:08 AM, "Andrea Cosentino" <ancosen1...@yahoo.com.INVALID> wrote: This seems related more on kafka connect configuration than the connector itself. I guess you'll need to tune the options related to this like: offset.flush.timeout.ms offset.flush.interval.ms -- Andrea Cosentino ---------------------------------- Apache Camel PMC Chair Apache Karaf Committer Apache Servicemix PMC Member Email: ancosen1...@yahoo.com Twitter: @oscerd2 Github: oscerd On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende <arundhati.bhe...@prudential.com> wrote: aws-s3 connector - not aws2-s3. On 1/4/21, 5:19 PM, "Andrea Cosentino" <anco...@gmail.com> wrote: Is this with aws2-s3 connector or aws2-s3? Il lun 4 gen 2021, 23:05 Arundhati Bhende <arundhati.bhe...@prudential.com> ha scritto: > Hi, I am testing the connector with different cleanup policies for the > Topic. > > If the topic cleanup.policy is set to "delete", the connector works > correctly and I am able to access the message in the topic > > If the topic cleanup.policy is set to "compact", the connect Task fails > with the below error. > > I am trying to find out why this happens. Can someone please explain? > > trace: org.apache.kafka.connect.errors.ConnectException: > OffsetStorageWriter is already flushing > at > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > Thank you > > >