Thank you.  And I do not need to use compact, but it so happened that the topic 
I was given to use for the POC was originally defined as compact and when I was 
getting many different errors which can now be attributed to what I was doing 
with the connector configuration that was wrong and I decided to create a new 
topic, but I used the same script that we had for creating topics, just changed 
the name of the topic.

Out of curiosity, I will try to explore the broker configuration, but otherwise 
my initial POC is done __

Thanks again

On 1/5/21, 11:04 AM, "Andrea Cosentino" <ancosen1...@yahoo.com> wrote:

    The options I reported are related to kafka broker configuration I guess, 
so they should be set at broker level and not in the connector config.

    By the way, without more information on your configuration and why you need 
compaction this is not so much we can do.

    --
    Andrea Cosentino 
    ----------------------------------
    Apache Camel PMC Chair
    Apache Karaf Committer
    Apache Servicemix PMC Member
    Email: ancosen1...@yahoo.com
    Twitter: @oscerd2
    Github: oscerd






    On Tuesday, January 5, 2021, 04:59:56 PM GMT+1, Arundhati Bhende 
<arundhati.bhe...@prudential.com> wrote: 





    Thanks.  I tried with those options with many combinations,  but kept 
getting same error.  Asking this to get better understanding.

    So, I used the same connector configuration as below.    I created the 
topic with cleanup.policy=compact and kept getting the error below, so I 
changed "only" the cleanup policy to "delete" and it worked.  Other 
configuration parameters for the topic were kept exactly the same.    So, 
trying to understand the reason for why the topic must be cleanup.policy = 
delete. 

    DATA=$( cat << EOF
    {
            "connector.class": 
"org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": 
"org.apache.kafka.connect.storage.StringConverter",
            "camel.source.maxPollDuration": "10000",
            "topics": "TEST-S3-SOURCE-DZONE-POC",
            "camel.source.path.bucketNameOrArn": " push-json-poc",
            "camel.component.aws-s3.region": "US_EAST_1",
            "tasks.max": "1",
            "camel.source.endpoint.useIAMCredentials": "true",
            "camel.source.endpoint.autocloseBody": "true"
    }
    EOF
    )

    Thanks



    On 1/5/21, 3:08 AM, "Andrea Cosentino" <ancosen1...@yahoo.com.INVALID> 
wrote:

        This seems related more on kafka connect configuration than the 
connector itself. I guess you'll need to tune the options related to this like:

        offset.flush.timeout.ms
        offset.flush.interval.ms

        --
        Andrea Cosentino 
        ----------------------------------
        Apache Camel PMC Chair
        Apache Karaf Committer
        Apache Servicemix PMC Member
        Email: ancosen1...@yahoo.com
        Twitter: @oscerd2
        Github: oscerd






        On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende 
<arundhati.bhe...@prudential.com> wrote: 





        aws-s3 connector  -  not aws2-s3.  

        On 1/4/21, 5:19 PM, "Andrea Cosentino" <anco...@gmail.com> wrote:

            Is this with aws2-s3 connector or aws2-s3?

            Il lun 4 gen 2021, 23:05 Arundhati Bhende 
<arundhati.bhe...@prudential.com>
            ha scritto:

            > Hi, I am testing the connector with different cleanup policies 
for the
            > Topic.
            >
            > If the topic cleanup.policy is set to "delete",  the connector 
works
            > correctly and I am able to access the message in the topic
            >
            > If the topic cleanup.policy is set to "compact", the connect Task 
fails
            > with the below error.
            >
            > I am trying to find out why this happens.  Can someone please 
explain?
            >
            >      trace: org.apache.kafka.connect.errors.ConnectException:
            > OffsetStorageWriter is already flushing
            >          at
            > 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
            >          at
            > 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
            >          at
            > 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
            >          at
            > 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
            >          at
            > 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
            >          at
            > 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            >          at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
            >          at
            > 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            >          at
            > 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            >          at java.lang.Thread.run(Thread.java:748)
            >
            > Thank you
            >
            >
            >



Reply via email to