Hi Pere, Following is my configuration. In this test, I want to flush 1000 records and/or 5 minutes.
{"connector.class":"io.confluent.connect.s3.S3SinkConnector","s3.region":"us-east-1","topics.dir":"test_topics","flush.size":"1000","schema.compatibility":"NONE","topics":"sinmaj-test","tasks.max":"1","s3.part.size":"5242880","locale":"en","format.class":"io.confluent.connect.s3.format.json.JsonFormat","task.class":"io.confluent.connect.s3.S3SinkTask","partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","name":"s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","s3.bucket.name":"test-bucket","rotate.schedule.interval.ms":"300000","partition.field.name":"externalTenantId", "timestamp.extractor":"Wallclock"} I get the following error. I am using version 3.3.0 2019-02-03 17:59:01,261] ERROR Commit of WorkerSinkTask{id=s3-test-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:205) java.lang.NullPointerException at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-02-03 17:59:01,262] ERROR Task s3-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) java.lang.NullPointerException at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-02-03 17:59:01,262] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149) Thanks, -Manu -----Original Message----- From: Pere Urbón Bayes <pere.ur...@gmail.com> Sent: Sunday, February 03, 2019 12:50 PM To: users@kafka.apache.org Subject: Re: Kafka connect FieldPartitioner with scheduled rotation EXTERNAL Hi Manu, can you share your s3 connector config as well the exception you are getting? with only this info, I do need more details to understand your issue. Keep in mind the option you are using "rotate.schedule.interval.ms" from the docs says: > This configuration is useful when you have to commit your data based > on current server time, for example at the beginning of every hour. The default value -1 means that this feature is disabled. Cheers Missatge de Manu Jacob <manu.ja...@sas.com> del dia dg., 3 de febr. 2019 a les 16:47: > Hi, > > I want to use s3 connect by portioning with FieldPartitioner and > partition.field.name set to a non timestamp based field. I want to > commit and flush based on both size and time. I am getting an > exception when I use the option "rotate.schedule.interval.ms". Is it > possible to rotate it with FieldPartitioner? I tried to set the > timestamp.extractor (with record and wallclock) but it looks like it > is honored only for time based partitioner. > > -Manu > > > -- Pere Urbon-Bayes Software Architect http://www.purbon.com https://twitter.com/purbon https://www.linkedin.com/in/purbon/