The options I reported are related to kafka broker configuration I guess, so 
they should be set at broker level and not in the connector config.

By the way, without more information on your configuration and why you need 
compaction this is not so much we can do.

--
Andrea Cosentino 
----------------------------------
Apache Camel PMC Chair
Apache Karaf Committer
Apache Servicemix PMC Member
Email: ancosen1...@yahoo.com
Twitter: @oscerd2
Github: oscerd






On Tuesday, January 5, 2021, 04:59:56 PM GMT+1, Arundhati Bhende 
<arundhati.bhe...@prudential.com> wrote: 





Thanks.  I tried with those options with many combinations,  but kept getting 
same error.  Asking this to get better understanding.

So, I used the same connector configuration as below.    I created the topic 
with cleanup.policy=compact and kept getting the error below, so I changed 
"only" the cleanup policy to "delete" and it worked.  Other configuration 
parameters for the topic were kept exactly the same.    So, trying to 
understand the reason for why the topic must be cleanup.policy = delete. 

DATA=$( cat << EOF
{
        "connector.class": 
"org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "camel.source.maxPollDuration": "10000",
        "topics": "TEST-S3-SOURCE-DZONE-POC",
        "camel.source.path.bucketNameOrArn": " push-json-poc",
        "camel.component.aws-s3.region": "US_EAST_1",
        "tasks.max": "1",
        "camel.source.endpoint.useIAMCredentials": "true",
        "camel.source.endpoint.autocloseBody": "true"
}
EOF
)

Thanks



On 1/5/21, 3:08 AM, "Andrea Cosentino" <ancosen1...@yahoo.com.INVALID> wrote:

    This seems related more on kafka connect configuration than the connector 
itself. I guess you'll need to tune the options related to this like:

    offset.flush.timeout.ms
    offset.flush.interval.ms

    --
    Andrea Cosentino 
    ----------------------------------
    Apache Camel PMC Chair
    Apache Karaf Committer
    Apache Servicemix PMC Member
    Email: ancosen1...@yahoo.com
    Twitter: @oscerd2
    Github: oscerd






    On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende 
<arundhati.bhe...@prudential.com> wrote: 





    aws-s3 connector  -  not aws2-s3.  

    On 1/4/21, 5:19 PM, "Andrea Cosentino" <anco...@gmail.com> wrote:

        Is this with aws2-s3 connector or aws2-s3?

        Il lun 4 gen 2021, 23:05 Arundhati Bhende 
<arundhati.bhe...@prudential.com>
        ha scritto:

        > Hi, I am testing the connector with different cleanup policies for the
        > Topic.
        >
        > If the topic cleanup.policy is set to "delete",  the connector works
        > correctly and I am able to access the message in the topic
        >
        > If the topic cleanup.policy is set to "compact", the connect Task 
fails
        > with the below error.
        >
        > I am trying to find out why this happens.  Can someone please explain?
        >
        >      trace: org.apache.kafka.connect.errors.ConnectException:
        > OffsetStorageWriter is already flushing
        >          at
        > 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
        >          at
        > 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
        >          at
        > 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
        >          at
        > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        >          at
        > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        >          at
        > 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        >          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        >          at
        > 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        >          at
        > 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        >          at java.lang.Thread.run(Thread.java:748)
        >
        > Thank you
        >
        >
        >


Reply via email to