[jira] [Created] (KAFKA-10376) TimeWindowedDeserializer does not deserialize the end timestamp of time window aggregate message keys.
B R created KAFKA-10376: --- Summary: TimeWindowedDeserializer does not deserialize the end timestamp of time window aggregate message keys. Key: KAFKA-10376 URL: https://issues.apache.org/jira/browse/KAFKA-10376 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: B R When a _Consumer_ is consuming messages containing Time Windowed aggregates, it has no way of obtaining the _end_ timestamps of the window contained in the key; instead, it must specify the window size (which it likely does not know) or receive end timestamps set to Long.MAX_VALUE. -- thus, rendering the window essentially unbound. An objective Consumer should not be expected to know the window size of the aggregate messages it is consuming. This is especially true if it is consuming several different topics with windows off varying size. This issue is peripherally related to KAFKA-4468, KAFKA-9649 and KAFKA-10366. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind the consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. As a part of [WorkerSinkTask initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], when the [SinkTask starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], we can supply the specific offsets by +"context.offset(supplied_offsets);+" in start() method, so that when the consumer does the first poll, it should rewind to the specific offsets in rewind() method. However in practice, we saw the following IllegalStateException when running consumer.seek(tp, offsets); {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution (that has been initially verified) proposed in the attached PR is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, to handle the initial position of the consumer, when specific offsets are provided by external through WorkerSinkTaskContext was: In [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind the consumer. In the [poll()
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind the consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. As a part of [WorkerSinkTask initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], when the [SinkTask starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], we can supply the specific offsets by +"context.offset(supplied_offsets);+" in start() method, so that when the consumer does the first poll, it should rewind to the specific offsets in rewind() method. However in practice, we saw the following IllegalStateException when running consumer.seek(tp, offsets); {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. was: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes (+start(Map props)+), we do +"context.offset(offsets);+" , then in above step (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. was: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind customer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes (+start(Map props)+), we do +"context.offset(offsets);+" , then in above step (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition
[jira] [Commented] (KAFKA-10009) Add method for getting last record offset in kafka partition
[ https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173671#comment-17173671 ] Werner Daehn commented on KAFKA-10009: -- What would be needed is either * endOffsets() returns the last existing offset. A bit dangerous as it might return offset 100, then a log compaction happens and then we start reading. * poll() telling that it retrieved the last record. That should be doable. Then we can call poll(100ms) in a loop until it tells us no-more-data via another getter. If I am interested in the current records only I stop polling now, all others will simply continue calling poll(). And the best of it, no side effects and backward compatibility. The one thing I don't know is if poll even has a chance to get that information yet or if the broker must be changed as well. I have seen many similar questions and no real solution, so this is a popular request. > Add method for getting last record offset in kafka partition > > > Key: KAFKA-10009 > URL: https://issues.apache.org/jira/browse/KAFKA-10009 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Yuriy Badalyantc >Priority: Major > > As far as I understand, at the current moment, there is no reliable way for > getting offset of the last record in the partition using java client. There > is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} > works fine. But in the case of transactional producer, topic may contain > offsets without a record. And {{endOffsets - 1}} will point to the offset > without record. > This feature will help in situations when consumer application wants to > consume the whole topic. Checking of beginning and last record offset will > give lower and upper bounds for consuming. Of course, it is doable with the > current consumer implementation, but I need to check {{position}} after each > poll. > Also, I believe that this feature may help with monitoring and operations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10009) Add method for getting last record offset in kafka partition
[ https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Werner Daehn updated KAFKA-10009: - Priority: Major (was: Minor) > Add method for getting last record offset in kafka partition > > > Key: KAFKA-10009 > URL: https://issues.apache.org/jira/browse/KAFKA-10009 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Yuriy Badalyantc >Priority: Major > > As far as I understand, at the current moment, there is no reliable way for > getting offset of the last record in the partition using java client. There > is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} > works fine. But in the case of transactional producer, topic may contain > offsets without a record. And {{endOffsets - 1}} will point to the offset > without record. > This feature will help in situations when consumer application wants to > consume the whole topic. Checking of beginning and last record offset will > give lower and upper bounds for consuming. Of course, it is doable with the > current consumer implementation, but I need to check {{position}} after each > poll. > Also, I believe that this feature may help with monitoring and operations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10009) Add method for getting last record offset in kafka partition
[ https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173661#comment-17173661 ] Werner Daehn commented on KAFKA-10009: -- It is actually worse than that. There could be log compaction or other reasons why the offset value is not a dense number set. Without such functionality, how do you reliably know that you have read all data from the topic? * Option 1: execute a poll(1second) and if it returns no data, that means there is no more data. But maybe the network was busy, so 1 second is not enough. 10 seconds? 1 minute, 10 minutes? I don't want to wait for ten minutes just to decrease the probability there is more data and I just have not received it yet. * Option 2: First call the endOffset(), then you know the high water mark is offset=100. So you poll until you received the record with offset=99 and then you know you have gotten the last record. But what if there is no record with offset=99? Again, you will wait forever. > Add method for getting last record offset in kafka partition > > > Key: KAFKA-10009 > URL: https://issues.apache.org/jira/browse/KAFKA-10009 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Yuriy Badalyantc >Priority: Minor > > As far as I understand, at the current moment, there is no reliable way for > getting offset of the last record in the partition using java client. There > is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} > works fine. But in the case of transactional producer, topic may contain > offsets without a record. And {{endOffsets - 1}} will point to the offset > without record. > This feature will help in situations when consumer application wants to > consume the whole topic. Checking of beginning and last record offset will > give lower and upper bounds for consuming. Of course, it is doable with the > current consumer implementation, but I need to check {{position}} after each > poll. > Also, I believe that this feature may help with monitoring and operations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10373) Kafka Reassign Partition is stuck with Java OutOfMemory error
[ https://issues.apache.org/jira/browse/KAFKA-10373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10373: -- Component/s: (was: KafkaConnect) core > Kafka Reassign Partition is stuck with Java OutOfMemory error > - > > Key: KAFKA-10373 > URL: https://issues.apache.org/jira/browse/KAFKA-10373 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.1 >Reporter: azher khan >Priority: Major > > Hi Team, > While trying to run the Kafka script to reassign partitions of an existing > topic, we are seeing a Java OutOfMemory issue. > > The heap for Kafka is set to "-Xmx1G -Xms1G" on the kafka broker. > > {code:java} > /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 > --reassignment-json-file topic_kafka_topic1_reassignment.json > --bootstrap-server kafkabroker1:9092 --verify > Status of partition reassignment: > [2020-08-07 XX:XX:XX,] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | reassign-partitions-tool': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) > at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116) > at java.lang.Thread.run(Thread.java:748) > Reassignment of partition kafka_topic1-0 is still in progress > Reassignment of partition kafka_topic1-1 is still in progress > Reassignment of partition kafka_topic1-2 is still in progress{code} > > Retried the above command after removing the "reassign_partitions" from > zookeeper as suggested but we are seeing the same error. > > > {code:java} > [zk: localhost:2181(CONNECTED) 5] delete /admin/reassign_partitions > [zk: localhost:2181(CONNECTED) 7] ls /admin > [delete_topics] > {code} > > Would highly appreciate your advice, > Thank you in advance, > > Regards, > Azher Khan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10375) Restore consumer fails with SSL handshake fail exception
[ https://issues.apache.org/jira/browse/KAFKA-10375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-10375: -- Attachment: stacktrace.txt > Restore consumer fails with SSL handshake fail exception > > > Key: KAFKA-10375 > URL: https://issues.apache.org/jira/browse/KAFKA-10375 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > Attachments: stacktrace.txt > > > After upgrading to 2.6, we started getting "SSL handshake fail" exceptions. > Curios thing is that it seems to affect only restore consumers. For mTLS, we > use dynamic certificates that are being reloaded automatically every X > minutes. > We didn't have any issues with it, up until upgrading 2.6 and other stream > processing jobs running Kafka 2.4 don't have similar problems. > After restarting the Kafka Streams instance, issue goes away. > > From the stacktrace, it's visible that problem is: > {code:java} > Aug 07 10:36:12.478 | Caused by: > java.security.cert.CertificateExpiredException: NotAfter: Fri Aug 07 07:45:16 > GMT 2020 > {code} > Seems like somehow restore consumer gets stuck with old certificate and it's > not refreshed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10375) Restore consumer fails with SSL handshake fail exception
[ https://issues.apache.org/jira/browse/KAFKA-10375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-10375: -- Attachment: (was: stacktrace.txt) > Restore consumer fails with SSL handshake fail exception > > > Key: KAFKA-10375 > URL: https://issues.apache.org/jira/browse/KAFKA-10375 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > > After upgrading to 2.6, we started getting "SSL handshake fail" exceptions. > Curios thing is that it seems to affect only restore consumers. For mTLS, we > use dynamic certificates that are being reloaded automatically every X > minutes. > We didn't have any issues with it, up until upgrading 2.6 and other stream > processing jobs running Kafka 2.4 don't have similar problems. > After restarting the Kafka Streams instance, issue goes away. > > From the stacktrace, it's visible that problem is: > {code:java} > Aug 07 10:36:12.478 | Caused by: > java.security.cert.CertificateExpiredException: NotAfter: Fri Aug 07 07:45:16 > GMT 2020 > {code} > Seems like somehow restore consumer gets stuck with old certificate and it's > not refreshed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10375) Restore consumer fails with SSL handshake fail exception
Levani Kokhreidze created KAFKA-10375: - Summary: Restore consumer fails with SSL handshake fail exception Key: KAFKA-10375 URL: https://issues.apache.org/jira/browse/KAFKA-10375 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Levani Kokhreidze Attachments: stacktrace.txt After upgrading to 2.6, we started getting "SSL handshake fail" exceptions. Curios thing is that it seems to affect only restore consumers. For mTLS, we use dynamic certificates that are being reloaded automatically every X minutes. We didn't have any issues with it, up until upgrading 2.6 and other stream processing jobs running Kafka 2.4 don't have similar problems. After restarting the Kafka Streams instance, issue goes away. >From the stacktrace, it's visible that problem is: {code:java} Aug 07 10:36:12.478 | Caused by: java.security.cert.CertificateExpiredException: NotAfter: Fri Aug 07 07:45:16 GMT 2020 {code} Seems like somehow restore consumer gets stuck with old certificate and it's not refreshed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173636#comment-17173636 ] Levani Kokhreidze commented on KAFKA-9659: -- Hi [~guozhang] just wanted to let you know that after upgrading to 2.6, we no longer see this issue. > Kafka Streams / Consumer configured for static membership fails on "fatal > exception: group.instance.id gets fenced" > --- > > Key: KAFKA-9659 > URL: https://issues.apache.org/jira/browse/KAFKA-9659 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Rohan Desai >Assignee: Guozhang Wang >Priority: Major > Attachments: ksql-1.logs > > > I'm running a KSQL query, which underneath is built into a Kafka Streams > application. The application has been running without issue for a few days, > until today, when all the streams threads exited with: > > > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Received fatal exception: group.instance.id gets fenced}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread run - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors:}} > \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker > rejected this static consumer since another consumer with the same > group.instance.id has registered with a different member.id.}}{{[INFO] > 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread setState - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > State transition from RUNNING to PENDING_SHUTDOWN}} > > I've attached the KSQL and Kafka Streams logs to this ticket. Here's a > summary for one of the streams threads (instance id `ksql-1-2`): > > Around 00:56:36 the coordinator fails over from b11 to b2: > > {{[INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to > heartbeat failed since coordinator > b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: > null) is either not started or not valid.}} > {{ [INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group > coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} > {{ [INFO] 2020-03-05 00:56:36,270 >
[GitHub] [kafka] jeqo commented on pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore
jeqo commented on pull request #9137: URL: https://github.com/apache/kafka/pull/9137#issuecomment-670850075 cc @ableegoldman This PR is ready for review, covering related feedback from #8976 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10352) Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)
[ https://issues.apache.org/jira/browse/KAFKA-10352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seongbae Chang resolved KAFKA-10352. Resolution: Resolved > Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel) > - > > Key: KAFKA-10352 > URL: https://issues.apache.org/jira/browse/KAFKA-10352 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Seongbae Chang >Priority: Critical > > One of my Kafka brokers(total 3, and version 2.5.0) was shut down suddenly. > And then, other brokers also was shut down because of similar causes. > > Main cause of this problem is '*Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)* > *java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint*' > > I haven't known why this error occurs and how to solve it. Please, give me > some answers or comments about it. Thank you. > And I attached the content of log files such as kafkaServer.out, > log-cleaner.log > > kafkaServer.out > {code:java} > [2020-07-30 19:49:05,992] INFO [GroupMetadataManager brokerId=3] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 19:49:05,992] INFO > [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 > 19:56:48,080] ERROR Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) at > java.nio.file.Files.newByteChannel(Files.java:407) at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) at > java.nio.file.Files.newBufferedReader(Files.java:2784) at > java.nio.file.Files.newBufferedReader(Files.java:2816) at > kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87) > at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at > kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61) > at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$2(LogCleanerManager.scala:134) > at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:583) at > scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:597) at > scala.collection.mutable.ListBuffer.addAll(ListBuffer.scala:118) at > scala.collection.mutable.ListBuffer$.from(ListBuffer.scala:38) at > scala.collection.immutable.List$.from(List.scala:617) at > scala.collection.immutable.List$.from(List.scala:611) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.immutable.Iterable$.from(Iterable.scala:35) at > scala.collection.immutable.Iterable$.from(Iterable.scala:32) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.IterableOps.flatMap(Iterable.scala:674) at > scala.collection.IterableOps.flatMap$(Iterable.scala:674) at > scala.collection.AbstractIterable.flatMap(Iterable.scala:921) at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$1(LogCleanerManager.scala:132) > at > kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:140) > at > kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:171) > at > kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:168) > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:327) at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:314) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:303) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2020-07-30 > 19:56:48,083] WARN [ReplicaManager broker=3] Stopping serving replicas in dir > /tmp/kafka-logs (kafka.server.ReplicaManager)[2020-07-30 19:56:48,086] INFO > [ReplicaFetcherManager on broker 3] Removed fetcher for partitions > HashSet(__consumer_offsets-8, sbchang.test.partition-0, > __consumer_offsets-47, sbchang.test.partition-2, sbchang.test.header-2, > configtest-0, __ispossible-0, __consumer_offsets-32, __consumer_offsets-35,
[jira] [Commented] (KAFKA-10352) Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)
[ https://issues.apache.org/jira/browse/KAFKA-10352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173595#comment-17173595 ] Seongbae Chang commented on KAFKA-10352: Thank you for your answer, [~dongjin]! According to your advice, I have replaced the path of data(log) directory into other places where is not the '/tmp'. And plus, I have changed the path of zookeeper's data directory. It has been working well. Thank you :) > Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel) > - > > Key: KAFKA-10352 > URL: https://issues.apache.org/jira/browse/KAFKA-10352 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Seongbae Chang >Priority: Critical > > One of my Kafka brokers(total 3, and version 2.5.0) was shut down suddenly. > And then, other brokers also was shut down because of similar causes. > > Main cause of this problem is '*Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)* > *java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint*' > > I haven't known why this error occurs and how to solve it. Please, give me > some answers or comments about it. Thank you. > And I attached the content of log files such as kafkaServer.out, > log-cleaner.log > > kafkaServer.out > {code:java} > [2020-07-30 19:49:05,992] INFO [GroupMetadataManager brokerId=3] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 19:49:05,992] INFO > [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 > 19:56:48,080] ERROR Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) at > java.nio.file.Files.newByteChannel(Files.java:407) at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) at > java.nio.file.Files.newBufferedReader(Files.java:2784) at > java.nio.file.Files.newBufferedReader(Files.java:2816) at > kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87) > at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at > kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61) > at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$2(LogCleanerManager.scala:134) > at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:583) at > scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:597) at > scala.collection.mutable.ListBuffer.addAll(ListBuffer.scala:118) at > scala.collection.mutable.ListBuffer$.from(ListBuffer.scala:38) at > scala.collection.immutable.List$.from(List.scala:617) at > scala.collection.immutable.List$.from(List.scala:611) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.immutable.Iterable$.from(Iterable.scala:35) at > scala.collection.immutable.Iterable$.from(Iterable.scala:32) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.IterableOps.flatMap(Iterable.scala:674) at > scala.collection.IterableOps.flatMap$(Iterable.scala:674) at > scala.collection.AbstractIterable.flatMap(Iterable.scala:921) at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$1(LogCleanerManager.scala:132) > at > kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:140) > at > kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:171) > at > kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:168) > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:327) at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:314) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:303) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2020-07-30 > 19:56:48,083] WARN [ReplicaManager broker=3] Stopping serving replicas in dir > /tmp/kafka-logs (kafka.server.ReplicaManager)[2020-07-30 19:56:48,086] INFO >
[GitHub] [kafka] ning2008wisc removed a comment on pull request #9145: KAFKA-10370: rewind consumer to SinkTaskContext's offsets when init
ning2008wisc removed a comment on pull request #9145: URL: https://github.com/apache/kafka/pull/9145#issuecomment-670820575 As mentioned in https://stackoverflow.com/questions/54480715/no-current-assignment-for-partition-occurs-even-after-poll-in-kafka, another potential resolution could be "call consumer.seek(tp, offsets)" in **onPartitionsAssigned() callback** after subscribing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org