[ https://issues.apache.org/jira/browse/KAFKA-6133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch closed KAFKA-6133. -------------------------------- > NullPointerException in S3 Connector when using rotate.interval.ms > ------------------------------------------------------------------ > > Key: KAFKA-6133 > URL: https://issues.apache.org/jira/browse/KAFKA-6133 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.11.0.0 > Reporter: Elizabeth Bennett > > I just tried out the new rotate.interval.ms feature in the S3 connector to do > time based flushing. I am getting this NPE on every event: > {code}[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask) > java.lang.NullPointerException > at > io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288) > at > io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234) > at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > [2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover > until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask) > [2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) > org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due > to unrecoverable exception. > at > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748){code} > I dug into the S3 connect code a bit and it looks like the > {{rotate.interval.ms}} feature only works if you are using the > TimeBasedPartitioner. It will get the TimestampExtractor class from the > TimeBasedPartitioner to determine the timestamp of the event, and will use > this for the time based flushing. > I'm using a custom partitioner, but I'd still really like to use the > {{rotate.interval.ms}} feature, using wall clock time to determine the > flushing behavior. > I'd be willing to work on fixing this issue, but I want to confirm it is > actually bug, and not that it was specifically designed to only work with the > TimeBasedPartitioner. Even if it is the latter, it should probably not crash > with an NPE. -- This message was sent by Atlassian JIRA (v6.4.14#64029)