Hi Pere,

Following is my configuration. In this test, I want to flush 1000 records 
and/or 5 minutes.


{"connector.class":"io.confluent.connect.s3.S3SinkConnector","s3.region":"us-east-1","topics.dir":"test_topics","flush.size":"1000","schema.compatibility":"NONE","topics":"sinmaj-test","tasks.max":"1","s3.part.size":"5242880","locale":"en","format.class":"io.confluent.connect.s3.format.json.JsonFormat","task.class":"io.confluent.connect.s3.S3SinkTask","partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","name":"s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","s3.bucket.name":"test-bucket","rotate.schedule.interval.ms":"300000","partition.field.name":"externalTenantId",
 "timestamp.extractor":"Wallclock"}

I get the following error. I am using version 3.3.0

2019-02-03 17:59:01,261] ERROR Commit of WorkerSinkTask{id=s3-test-0} offsets 
threw an unexpected exception:  
(org.apache.kafka.connect.runtime.WorkerSinkTask:205)
java.lang.NullPointerException
        at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:193)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-03 17:59:01,262] ERROR Task s3-test-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.NullPointerException
        at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:206)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:323)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-03 17:59:01,262] ERROR Task is being killed and will not recover until 
manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

Thanks,
-Manu

-----Original Message-----
From: Pere Urbón Bayes <pere.ur...@gmail.com> 
Sent: Sunday, February 03, 2019 12:50 PM
To: users@kafka.apache.org
Subject: Re: Kafka connect FieldPartitioner with scheduled rotation

EXTERNAL

Hi Manu,
  can you share your s3 connector config as well the exception you are getting? 
with only this info, I do need more details to understand your issue. Keep in 
mind the option you are using "rotate.schedule.interval.ms"
from the docs says:

> This configuration is useful when you have to commit your data based 
> on
current server time, for example at the beginning of every hour. The default 
value -1 means that this feature is disabled.

Cheers

Missatge de Manu Jacob <manu.ja...@sas.com> del dia dg., 3 de febr. 2019 a les 
16:47:

> Hi,
>
> I want to use s3 connect by portioning with FieldPartitioner and 
> partition.field.name set to a non timestamp based field. I want to 
> commit and flush based on  both size and time. I am getting an 
> exception when I use the option "rotate.schedule.interval.ms". Is it 
> possible to rotate it with FieldPartitioner? I tried to set the  
> timestamp.extractor (with record and wallclock) but it looks like it 
> is honored only for time based partitioner.
>
> -Manu
>
>
>

--
Pere Urbon-Bayes
Software Architect
http://www.purbon.com
https://twitter.com/purbon
https://www.linkedin.com/in/purbon/

Reply via email to