[jira] [Updated] (KAFKA-1407) Broker can not return to ISR because of BadVersionException

2020-10-18 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-1407:

Priority: Critical  (was: Major)

> Broker can not return to ISR because of BadVersionException
> ---
>
> Key: KAFKA-1407
> URL: https://issues.apache.org/jira/browse/KAFKA-1407
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
>Priority: Critical
>
> Each morning we found a broker out of ISR at stuck with log full of messages:
> {code}
> INFO   | jvm 1| 2014/04/21 08:36:21 | [2014-04-21 09:36:21,907] ERROR 
> Conditional update of path /brokers/topics/topic2/partitions/1/state with 
> data 
> {"controller_epoch":46,"leader":2,"version":1,"leader_epoch":38,"isr":[2]} 
> and expected version 53 failed due to 
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
> BadVersion for /brokers/topics/topic2/partitions/1/state 
> (kafka.utils.ZkUtils$)
> INFO   | jvm 1| 2014/04/21 08:36:21 | [2014-04-21 09:36:21,907] INFO 
> Partition [topic2,1] on broker 2: Cached zkVersion [53] not equal to that in 
> zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> It seems that it can not recover after short netwrok break down and the only 
> way to return it is restart it using kill -9.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-1407) Broker can not return to ISR because of BadVersionException

2020-10-18 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-1407:

Affects Version/s: 2.4.1

> Broker can not return to ISR because of BadVersionException
> ---
>
> Key: KAFKA-1407
> URL: https://issues.apache.org/jira/browse/KAFKA-1407
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1, 2.4.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
>Priority: Critical
>
> Each morning we found a broker out of ISR at stuck with log full of messages:
> {code}
> INFO   | jvm 1| 2014/04/21 08:36:21 | [2014-04-21 09:36:21,907] ERROR 
> Conditional update of path /brokers/topics/topic2/partitions/1/state with 
> data 
> {"controller_epoch":46,"leader":2,"version":1,"leader_epoch":38,"isr":[2]} 
> and expected version 53 failed due to 
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
> BadVersion for /brokers/topics/topic2/partitions/1/state 
> (kafka.utils.ZkUtils$)
> INFO   | jvm 1| 2014/04/21 08:36:21 | [2014-04-21 09:36:21,907] INFO 
> Partition [topic2,1] on broker 2: Cached zkVersion [53] not equal to that in 
> zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> It seems that it can not recover after short netwrok break down and the only 
> way to return it is restart it using kill -9.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2020-10-18 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-2729:

Affects Version/s: 2.4.1

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0, 2.4.1
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Critical
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2020-10-18 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-3042:

Affects Version/s: 2.4.1

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0, 2.4.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Critical
>  Labels: reliability
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2020-10-18 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216323#comment-17216323
 ] 

M. Manna edited comment on KAFKA-2729 at 10/18/20, 5:20 PM:


This has resurfaced for us in production environment yesterday and caused an 
outage. Has anyone else seen this issue recently? We are using Kafka 2.4.1 
(through Confluent), but without any customisation. 

It'd be good to know if there are any steps to reproduce this successfully. The 
above mentioned test (network stretch or switching) is quite difficult for us 
to run at the moment. 


was (Author: manme...@gmail.com):
This has resurfaced for us in production environment yesterday and caused an 
outage. Has anyone else seen this issue recently? We are using Confluent 2.4.1, 
but without any customisation. 

It'd be good to know if there are any steps to reproduce this successfully. The 
above mentioned test (network stretch or switching) is quite difficult for us 
to run at the moment. 

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Critical
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2020-10-18 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216327#comment-17216327
 ] 

M. Manna commented on KAFKA-3042:
-

We have been hit by this yesterday in Production and had an outage. I have 
added the same comments in the original ticket. We are running Kafka 2.4.1 with 
Confluent. This could've happened event with the most recent release versions 
too. I will change the priority to critical.

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Critical
>  Labels: reliability
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2020-10-18 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-3042:

Priority: Critical  (was: Major)

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Critical
>  Labels: reliability
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2020-10-18 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216323#comment-17216323
 ] 

M. Manna commented on KAFKA-2729:
-

This has resurfaced for us in production environment yesterday and caused an 
outage. Has anyone else seen this issue recently? We are using Confluent 2.4.1, 
but without any customisation. 

It'd be good to know if there are any steps to reproduce this successfully. The 
above mentioned test (network stretch or switching) is quite difficult for us 
to run at the moment. 

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Critical
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2020-10-18 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-2729:

Priority: Critical  (was: Major)

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Critical
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level

2020-09-11 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194093#comment-17194093
 ] 

M. Manna commented on KAFKA-10465:
--

[~guozhang] - Thanks for your comments. I am not sure whether you have let the 
producer run continuously, but it doesn't really make a difference either way. 
In our test environments where things are running perpetually, we see the same 
issue. And we don't pause anything. And the code i have shown you here mimics 
what we have. 

The issue is very specific when you assign (vs. Subscribe to) TopicPartition. 
If you check for endOffsets(), pausing or not pausing, you never seem to get 
the last committed offset at all. Also, if there is a transaction marker 
associated to it, this shouldn't be sent to the consumer as per documentation. 

Could you kindly comment on the above?

> Potential Bug/Doc update in Transactional Producer and Isolation Level
> --
>
> Key: KAFKA-10465
> URL: https://issues.apache.org/jira/browse/KAFKA-10465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1
>Reporter: M. Manna
>Priority: Critical
> Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, 
> ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg
>
>
> *Issue*
> Difference between LSO and High Watermark offsets when a consumer with 
> "read_committed" aren't probably explained in the correct place.
> *Expected Behaviour*
> According to documentation, the offset returned should be the one committed 
> last (and successfully). 
> *Observed (with steps)*
> 1. Start a local or test kafka cluster (2.4.1 or above)
> 2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 
> and 1 is good)
> 3. Use the attached producer app file and set debug pointer to be able to 
> pause on print
> 4. Use the attached consumer app file to start a consumer and debug through 
> steps.
> It can be seen that the consumer is actually able to fetch an offset that's 
> not committed by the producer yet. 
> Just trying to raise this ticket to confirm whether:
> 1) this is well-documented anywhere (which I have missed) - Please refer to 
> this documentation as a resolution
> 2) This is a bug - please confirm and provide a timeline when this can be 
> fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level

2020-09-07 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-10465:
-
Attachment: ConsumerTestApp.java

> Potential Bug/Doc update in Transactional Producer and Isolation Level
> --
>
> Key: KAFKA-10465
> URL: https://issues.apache.org/jira/browse/KAFKA-10465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1
>Reporter: M. Manna
>Priority: Critical
> Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, 
> ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg
>
>
> *Issue*
> Difference between LSO and High Watermark offsets when a consumer with 
> "read_committed" aren't probably explained in the correct place.
> *Expected Behaviour*
> According to documentation, the offset returned should be the one committed 
> last (and successfully). 
> *Observed (with steps)*
> 1. Start a local or test kafka cluster (2.4.1 or above)
> 2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 
> and 1 is good)
> 3. Use the attached producer app file and set debug pointer to be able to 
> pause on print
> 4. Use the attached consumer app file to start a consumer and debug through 
> steps.
> It can be seen that the consumer is actually able to fetch an offset that's 
> not committed by the producer yet. 
> Just trying to raise this ticket to confirm whether:
> 1) this is well-documented anywhere (which I have missed) - Please refer to 
> this documentation as a resolution
> 2) This is a bug - please confirm and provide a timeline when this can be 
> fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level

2020-09-07 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-10465:
-
Attachment: (was: ConsumerTestApp.java)

> Potential Bug/Doc update in Transactional Producer and Isolation Level
> --
>
> Key: KAFKA-10465
> URL: https://issues.apache.org/jira/browse/KAFKA-10465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1
>Reporter: M. Manna
>Priority: Critical
> Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, 
> ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg
>
>
> *Issue*
> Difference between LSO and High Watermark offsets when a consumer with 
> "read_committed" aren't probably explained in the correct place.
> *Expected Behaviour*
> According to documentation, the offset returned should be the one committed 
> last (and successfully). 
> *Observed (with steps)*
> 1. Start a local or test kafka cluster (2.4.1 or above)
> 2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 
> and 1 is good)
> 3. Use the attached producer app file and set debug pointer to be able to 
> pause on print
> 4. Use the attached consumer app file to start a consumer and debug through 
> steps.
> It can be seen that the consumer is actually able to fetch an offset that's 
> not committed by the producer yet. 
> Just trying to raise this ticket to confirm whether:
> 1) this is well-documented anywhere (which I have missed) - Please refer to 
> this documentation as a resolution
> 2) This is a bug - please confirm and provide a timeline when this can be 
> fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level

2020-09-07 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-10465:
-
Priority: Critical  (was: Major)

> Potential Bug/Doc update in Transactional Producer and Isolation Level
> --
>
> Key: KAFKA-10465
> URL: https://issues.apache.org/jira/browse/KAFKA-10465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1
>Reporter: M. Manna
>Priority: Critical
> Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, 
> ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg
>
>
> *Issue*
> Difference between LSO and High Watermark offsets when a consumer with 
> "read_committed" aren't probably explained in the correct place.
> *Expected Behaviour*
> According to documentation, the offset returned should be the one committed 
> last (and successfully). 
> *Observed (with steps)*
> 1. Start a local or test kafka cluster (2.4.1 or above)
> 2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 
> and 1 is good)
> 3. Use the attached producer app file and set debug pointer to be able to 
> pause on print
> 4. Use the attached consumer app file to start a consumer and debug through 
> steps.
> It can be seen that the consumer is actually able to fetch an offset that's 
> not committed by the producer yet. 
> Just trying to raise this ticket to confirm whether:
> 1) this is well-documented anywhere (which I have missed) - Please refer to 
> this documentation as a resolution
> 2) This is a bug - please confirm and provide a timeline when this can be 
> fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level

2020-09-07 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191682#comment-17191682
 ] 

M. Manna commented on KAFKA-10465:
--

I have sent an email to users DL - it would be appreciated if someone could 
revert back to this. We didn't see this issue with Non-Txn producers - but 
since we are intending to use Txn Producers in production soon it'd be good to 
get a view on this. We are hoping this is a misunderstanding/misconception 
somewhere at our side.

 

> Potential Bug/Doc update in Transactional Producer and Isolation Level
> --
>
> Key: KAFKA-10465
> URL: https://issues.apache.org/jira/browse/KAFKA-10465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1
>Reporter: M. Manna
>Priority: Major
> Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, 
> ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg
>
>
> *Issue*
> Difference between LSO and High Watermark offsets when a consumer with 
> "read_committed" aren't probably explained in the correct place.
> *Expected Behaviour*
> According to documentation, the offset returned should be the one committed 
> last (and successfully). 
> *Observed (with steps)*
> 1. Start a local or test kafka cluster (2.4.1 or above)
> 2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 
> and 1 is good)
> 3. Use the attached producer app file and set debug pointer to be able to 
> pause on print
> 4. Use the attached consumer app file to start a consumer and debug through 
> steps.
> It can be seen that the consumer is actually able to fetch an offset that's 
> not committed by the producer yet. 
> Just trying to raise this ticket to confirm whether:
> 1) this is well-documented anywhere (which I have missed) - Please refer to 
> this documentation as a resolution
> 2) This is a bug - please confirm and provide a timeline when this can be 
> fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level

2020-09-07 Thread M. Manna (Jira)
M. Manna created KAFKA-10465:


 Summary: Potential Bug/Doc update in Transactional Producer and 
Isolation Level
 Key: KAFKA-10465
 URL: https://issues.apache.org/jira/browse/KAFKA-10465
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.4.1
Reporter: M. Manna
 Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, 
ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg

*Issue*

Difference between LSO and High Watermark offsets when a consumer with 
"read_committed" aren't probably explained in the correct place.

*Expected Behaviour*
According to documentation, the offset returned should be the one committed 
last (and successfully). 

*Observed (with steps)*

1. Start a local or test kafka cluster (2.4.1 or above)
2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 and 
1 is good)
3. Use the attached producer app file and set debug pointer to be able to pause 
on print
4. Use the attached consumer app file to start a consumer and debug through 
steps.

It can be seen that the consumer is actually able to fetch an offset that's not 
committed by the producer yet. 

Just trying to raise this ticket to confirm whether:

1) this is well-documented anywhere (which I have missed) - Please refer to 
this documentation as a resolution

2) This is a bug - please confirm and provide a timeline when this can be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020218#comment-17020218
 ] 

M. Manna commented on KAFKA-9458:
-

[~hirik] I am not sure if that was the right PR. I also see that someone has 
provided [https://github.com/apache/kafka/pull/6403] in the Comments (around 
September 2019). Bottom line, it's probably better to run Kafka in Docker or 
some Linux env. With Windows, nothing can be tested for certain. I must admit 
that I haven't gone past the verbosity of Kafka's log cleaner logic as it's 
mixed with Scala and Java. You are welcome to investigate and let us know what 
you find.

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.

[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020168#comment-17020168
 ] 

M. Manna commented on KAFKA-9458:
-

[~hirik] - This is only an issue for Windows. Kafka isn't fully compatible with 
Windows and the issue has been there for a while. if you like to fix this issue 
without impacting any platform support, please feel free to do so. Otherwise, 
see comments on the relevant tickets (1194, 6188)

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(File

[jira] [Updated] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-9458:

Priority: Major  (was: Blocker)

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
>  ... 27 more
> [2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.ReplicaManager)
> [20

[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020166#comment-17020166
 ] 

M. Manna commented on KAFKA-9458:
-

This is related to Windows's ability to block concurrent change/deletion of 
memory mapped files.

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Blocker
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
>  ... 27 more
> [2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
> rep

[jira] [Commented] (KAFKA-9340) Potential race condition in AbstractConfig

2020-01-03 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007646#comment-17007646
 ] 

M. Manna commented on KAFKA-9340:
-

Perhaps someone else can confirm this too? 

I am not sure why synchronization is required here. Config provider usage is 
independent, or at least, it should be. If it's independent, shouldn't we 
simply remove all synchronization from here?

Also, used/unused vars are more for checking what's been used, and log items. 
Perhaps the intention is to ensure that the correct information is logged ? in 
that case, would a concurrent variant e.g. ConcurrentSkipListSet be better?

> Potential race condition in AbstractConfig
> --
>
> Key: KAFKA-9340
> URL: https://issues.apache.org/jira/browse/KAFKA-9340
> Project: Kafka
>  Issue Type: Bug
>Reporter: Roman Leventov
>Priority: Minor
>
> It's not clear why the {{used}} field in {{AbstractConfig}} should be a 
> synchronized set, but if does need to be synchronized, there is a race 
> condition in this line: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L214]
> {{keys.removeAll(used);}}
>  
> Possible fixes:
>  1. Document (e. g. in a comment) why {{used}} should be synchronized, and 
> replace line 214 with synchronized (used) \{ keys.removeAll(used); }
>  2. Remove unnecessary synchronization of {{used}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9363) Admin Script "kafka-topics" doesn't confirm on successful creation

2020-01-03 Thread M. Manna (Jira)
M. Manna created KAFKA-9363:
---

 Summary: Admin Script "kafka-topics" doesn't confirm on successful 
creation
 Key: KAFKA-9363
 URL: https://issues.apache.org/jira/browse/KAFKA-9363
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 2.4.0
Reporter: M. Manna
Assignee: M. Manna


When a topic is created from admin console, no confirmation is provided if 
--bootstrap-server is chosen. 

 

How to reproduce:

1) Get 2.4.0 distro

2) Download and extract code

3) Run "kafka-topics --create --topic "dummy" --partition 1 
--replication-factor 1 --bootstrap-server localhost:9092

4) Observe that no confirmation e.g. "Successfully created dummy" was provided.


We should, at least, provide a confirmation or restore the confirmation message 
which was annunciated before using --zookeeper argument. We all must use 
--describe flag to do a follow-up, but a confirmation message is a nice 
addition.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-5712) Expose ProducerConfig in the KafkaProducer clients

2019-10-03 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna closed KAFKA-5712.
---

> Expose ProducerConfig in the KafkaProducer clients
> --
>
> Key: KAFKA-5712
> URL: https://issues.apache.org/jira/browse/KAFKA-5712
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Leo Xuzhang Lin
>Priority: Trivial
>
> There is no easy way to introspect into a Producer's configuration. 
> For important configurations such as `acks` and `retries`, it is useful to be 
> able to verify programmatically the value for those configurations. 
> Since the ProducerConfig object is read only, there seems to be no harm to 
> expose that in a getter.
> ProducerConfig value in KafkaProducer:
> https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L246



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-4941) Better definition/introduction to the term brokers

2019-10-03 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna closed KAFKA-4941.
---

> Better definition/introduction to the term brokers
> --
>
> Key: KAFKA-4941
> URL: https://issues.apache.org/jira/browse/KAFKA-4941
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Yih Feng Low
>Priority: Trivial
>
> Hi,
> I just wanted to point out that in the documentation at 
> https://kafka.apache.org/documentation/, there are over 500 references to the 
> word "broker". However, the idea of what a broker is not clear. 
> The first mention of a Kafka broker comes from the sentence:
> ??Alternatively, instead of manually creating topics you can also configure 
> your brokers to auto-create topics when a non-existent topic is published 
> to.??
> Perhaps there could be a better discussion of what a broker is, similar to 
> the discussions on what a consumer/producer/partition etc. is?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4941) Better definition/introduction to the term brokers

2019-10-03 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna resolved KAFKA-4941.
-
Resolution: Invalid

> Better definition/introduction to the term brokers
> --
>
> Key: KAFKA-4941
> URL: https://issues.apache.org/jira/browse/KAFKA-4941
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Yih Feng Low
>Priority: Trivial
>
> Hi,
> I just wanted to point out that in the documentation at 
> https://kafka.apache.org/documentation/, there are over 500 references to the 
> word "broker". However, the idea of what a broker is not clear. 
> The first mention of a Kafka broker comes from the sentence:
> ??Alternatively, instead of manually creating topics you can also configure 
> your brokers to auto-create topics when a non-existent topic is published 
> to.??
> Perhaps there could be a better discussion of what a broker is, similar to 
> the discussions on what a consumer/producer/partition etc. is?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4941) Better definition/introduction to the term brokers

2019-10-03 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16943751#comment-16943751
 ] 

M. Manna commented on KAFKA-4941:
-

CLosing this because the term is a generic and ubiquitous for distributed 
database/messaging system. Too broad topic for Kafka.

> Better definition/introduction to the term brokers
> --
>
> Key: KAFKA-4941
> URL: https://issues.apache.org/jira/browse/KAFKA-4941
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Yih Feng Low
>Priority: Trivial
>
> Hi,
> I just wanted to point out that in the documentation at 
> https://kafka.apache.org/documentation/, there are over 500 references to the 
> word "broker". However, the idea of what a broker is not clear. 
> The first mention of a Kafka broker comes from the sentence:
> ??Alternatively, instead of manually creating topics you can also configure 
> your brokers to auto-create topics when a non-existent topic is published 
> to.??
> Perhaps there could be a better discussion of what a broker is, similar to 
> the discussions on what a consumer/producer/partition etc. is?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5513) Contradicting scalaDoc for AdminUtils.assignReplicasToBrokers

2019-10-03 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna reassigned KAFKA-5513:
---

Assignee: Tom Bentley

Apologies, could this be peer reviewed and closed with a 2nd Op?

> Contradicting scalaDoc for AdminUtils.assignReplicasToBrokers
> -
>
> Key: KAFKA-5513
> URL: https://issues.apache.org/jira/browse/KAFKA-5513
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Charly Molter
>Assignee: Tom Bentley
>Priority: Trivial
>
> The documentation for AdminUtils.assignReplicasToBrokers seems to contradict 
> itself.
> I says in the description: "As the result, if the number of replicas is equal 
> to or greater than the number of racks, it will ensure that each rack will 
> get at least one replica."
> Which means that it is possible to get an assignment where there's multiple 
> replicas in a rack (if there's less racks than the replication factor).
> However, the throws clauses says: " @throws AdminOperationException If rack 
> information is supplied but it is incomplete, or if it is not possible to 
> assign each replica to a unique rack."
> Which seems to be contradicting the first claim.
> In practice it doesn't throw when RF < #racks so the point in the @throws 
> clause should probably be removed.
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L121-L130



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5513) Contradicting scalaDoc for AdminUtils.assignReplicasToBrokers

2019-10-03 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16943748#comment-16943748
 ] 

M. Manna commented on KAFKA-5513:
-

Simply putting my thought here. The same doc says:


{noformat}
Otherwise, each rack will get at most one replica. In a perfect situation where 
the number of replicas is the same as the number of racks and each rack has the 
same number of brokers, it guarantees that the replica distribution is even 
across brokers and racks.{noformat}

For the exception, it says



{{}}
{noformat}
AdminOperationException If rack information is supplied but it is incomplete, 
or if it is not possible to assign each replica to a unique rack.{noformat}
And the code says:
{noformat}
if (brokerMetadatas.exists(_.rack.isEmpty)){noformat}

The document is clarifying that it will attempt for a uniform distribution. The 
exception will be thrown if that's not achievable for any of the racks. May be 
it's my mistake, but don't feel there is a contradiction here. 





 

 

> Contradicting scalaDoc for AdminUtils.assignReplicasToBrokers
> -
>
> Key: KAFKA-5513
> URL: https://issues.apache.org/jira/browse/KAFKA-5513
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Charly Molter
>Priority: Trivial
>
> The documentation for AdminUtils.assignReplicasToBrokers seems to contradict 
> itself.
> I says in the description: "As the result, if the number of replicas is equal 
> to or greater than the number of racks, it will ensure that each rack will 
> get at least one replica."
> Which means that it is possible to get an assignment where there's multiple 
> replicas in a rack (if there's less racks than the replication factor).
> However, the throws clauses says: " @throws AdminOperationException If rack 
> information is supplied but it is incomplete, or if it is not possible to 
> assign each replica to a unique rack."
> Which seems to be contradicting the first claim.
> In practice it doesn't throw when RF < #racks so the point in the @throws 
> clause should probably be removed.
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L121-L130



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5712) Expose ProducerConfig in the KafkaProducer clients

2019-10-03 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna resolved KAFKA-5712.
-
  Reviewer: Bill Bejeck
Resolution: Information Provided

> Expose ProducerConfig in the KafkaProducer clients
> --
>
> Key: KAFKA-5712
> URL: https://issues.apache.org/jira/browse/KAFKA-5712
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Leo Xuzhang Lin
>Priority: Trivial
>
> There is no easy way to introspect into a Producer's configuration. 
> For important configurations such as `acks` and `retries`, it is useful to be 
> able to verify programmatically the value for those configurations. 
> Since the ProducerConfig object is read only, there seems to be no harm to 
> expose that in a getter.
> ProducerConfig value in KafkaProducer:
> https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L246



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5712) Expose ProducerConfig in the KafkaProducer clients

2019-10-03 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16943726#comment-16943726
 ] 

M. Manna commented on KAFKA-5712:
-

[~leoxlin] As of 2.3.0 version of Kafka Clients a lot of flexibility has been 
added to support key value pair and named property additions. ProducerConfig is 
not a read-only class, it has it's own constructor and is perfectly usable for 
named values. Having a reflection/introspection API isn't a significant 
improvement in any areas. 

Closing this for now.

> Expose ProducerConfig in the KafkaProducer clients
> --
>
> Key: KAFKA-5712
> URL: https://issues.apache.org/jira/browse/KAFKA-5712
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Leo Xuzhang Lin
>Priority: Trivial
>
> There is no easy way to introspect into a Producer's configuration. 
> For important configurations such as `acks` and `retries`, it is useful to be 
> able to verify programmatically the value for those configurations. 
> Since the ProducerConfig object is read only, there seems to be no harm to 
> expose that in a getter.
> ProducerConfig value in KafkaProducer:
> https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L246



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8633) Extra in generated documents

2019-10-03 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna reassigned KAFKA-8633:
---

Assignee: M. Manna

> Extra  in generated documents
> --
>
> Key: KAFKA-8633
> URL: https://issues.apache.org/jira/browse/KAFKA-8633
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Weichu Liu
>Assignee: M. Manna
>Priority: Trivial
>
> The auto generated tables for all configurations (e.g. 
> [https://kafka.apache.org/documentation/#brokerconfigs]) are with 2  for 
> each cell.
> e.g. the first row for broker configuration.
> {noformat}
> 
> zookeeper.connectSpecifies the ZooKeeper connection string 
> in the form hostname:port where host and port are the host and 
> port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes 
> when that ZooKeeper machine is down you can also specify multiple hosts in 
> the form hostname1:port1,hostname2:port2,hostname3:port3.
> The server can also have a ZooKeeper chroot path as part of its ZooKeeper 
> connection string which puts its data under some path in the global ZooKeeper 
> namespace. For example to give a chroot path of /chroot/path you 
> would give the connection string as 
> hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only
> {noformat}
> This is due to {{toHtmlTable}} function in 
> {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is 
> appending an extra "" in the code.
> {code:java}
> for (String headerName : headers()) {
> addColumnValue(b, getConfigValue(key, headerName));
> b.append("");
> }
> {code}
> (The addColumnValue already wrap the value with  and )
> This is very minor issue, but it will prevent an html parser to properly 
> fetch table data (like what I was trying to do)
> --
> Update: I also found another glitch in the doc:
> Some configuration are using '<>' in the string, but they are recognized as 
> html tags so the description is not properly displayed.
> For example, the {{client.id}} of [Kafka Streams 
> Configs|https://kafka.apache.org/documentation/#streamsconfigs] displays
> {noformat}
> An ID prefix string used for the client IDs of internal consumer, producer 
> and restore-consumer, with pattern '-StreamThread--'.
> {noformat}
> However it should be
> {noformat}
> with pattern 
> '-StreamThread--'.
> {noformat}
> I feel the fastest way is to avoid angle brackets at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7059) Offer new constructor on ProducerRecord

2019-10-03 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna reassigned KAFKA-7059:
---

Assignee: Matthias Wessendorf

> Offer new constructor on ProducerRecord 
> 
>
> Key: KAFKA-7059
> URL: https://issues.apache.org/jira/browse/KAFKA-7059
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Matthias Wessendorf
>Assignee: Matthias Wessendorf
>Priority: Trivial
>
> creating a ProducerRecord, with custom headers requires usage of a 
> constructor with a slightly longer arguments list.
>  
> It would be handy or more convenient if there was a ctor, like:
> {code}
> public ProducerRecord(String topic, K key, V value, Iterable headers)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8759) Message Order is reversed when client run behind a VPN

2019-08-06 Thread M. Manna (JIRA)
M. Manna created KAFKA-8759:
---

 Summary: Message Order is reversed when client run behind a VPN
 Key: KAFKA-8759
 URL: https://issues.apache.org/jira/browse/KAFKA-8759
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.2.0
Reporter: M. Manna


We have noticed this behaviour whilst testing console producer against a kafka 
service installed on GCP. We have been using a fork from confluent Helm Chart.

[https://github.com/helm/charts/tree/master/incubator/kafka]

FYI - we've used cp 5.3.0 with Apache Kafka 2.3.0

Our VPN connection throughput was 1 mbps. Upon connecting to VPN, we opened a 
console producer client (2.2.0) with the following command:


{code:java}
kafka-console-producer.bat --topic some_topic --broker-list 
gcp_broker1:19092,gcp_broker2:19092,gcp_broker3:19092{code}
 

Similarly, we ran a consumer with the following command before publishing 
messages
{code:java}
kafka-console-consumer.bat --topic some_topic --bootstrap-server 
gcp_broker1:19092,gcp_broker2:19092,gcp_broker3:19092{code}

For producer console, we did receive a carat (>) prompt for publishing, so we 
entered messages:
{code:java}
>one
>two
>three
>{code}
After a while, it responded with NETWORK_EXCEPTION


{code:java}
[2019-08-02 11:17:19,690] WARN [Producer clientId=console-producer] Got error 
produce response with correlation id 8 on topic-partition some_topic-0, 
retrying (2 attempts left). Error: NETWORK_EXCEPTION 
(org.apache.kafka.clients.producer.internals.Sender){code}

We then hit "Enter" and received a carat (>) back


{code:java}
[2019-08-02 11:17:19,690] WARN [Producer clientId=console-producer] Got error 
produce response with correlation id 8 on topic-partition some_topic-0, 
retrying (2 attempts left). Error: NETWORK_EXCEPTION 
(org.apache.kafka.clients.producer.internals.Sender)

>{code}
 

Immediately after that, on consumer window, we received the following:
{code:java}
three
two
one{code}
 

We ran the same exercise from a regular network (wifi/lan) and didn't see this 
issue (i.e. works as described on Quickstart). 

This is slightly concerning for us since tunneling into a VPN shouldn't have 
any impact (or, should it) how kafka message protocol works over tcp. It seems 
that Kafka couldn't guarantee order of messages when network latency is 
involved. 

FYI 
1) We tried on VPN with --request_timeout_ms 12 and still same results.
2) Our setup was 3 node (3 br, 3 zk) with every topic having 1 partition only 
(RF - 3).



 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-3333) Alternative Partitioner to Support "Always Round-Robin" partitioning

2019-05-20 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-:

Description: 
** Please Look into KAFKA-7358 for the official description **

 

The 
[DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
 typically distributes using the hash of the keybytes, and falls back to round 
robin if there is no key. But there is currently no way to do Round Robin 
partitioning if you have keys on your messages without writing your own 
partitioning implementation.

I think it'd be helpful to have an implementation of straight Round Robin 
partitioning included with the library.

  was:
The 
[DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
 typically distributes using the hash of the keybytes, and falls back to round 
robin if there is no key.  But there is currently no way to do Round Robin 
partitioning if you have keys on your messages without writing your own 
partitioning implementation.

I think it'd be helpful to have an implementation of straight Round Robin 
partitioning included with the library.  


> Alternative Partitioner to Support "Always Round-Robin" partitioning
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Stephen Powis
>Assignee: M. Manna
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.3.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> ** Please Look into KAFKA-7358 for the official description **
>  
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key. But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-3333) Alternative Partitioner to Support "Always Round-Robin" partitioning

2019-05-20 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna reassigned KAFKA-:
---

 Assignee: M. Manna
Fix Version/s: 2.3.0
Affects Version/s: (was: 0.10.0.0)

i am taking over since a KIP has been voted and accepted (KIP-369)

> Alternative Partitioner to Support "Always Round-Robin" partitioning
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Stephen Powis
>Assignee: M. Manna
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.3.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3333) Alternative Partitioner to Support "Always Round-Robin" assignment

2019-05-20 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-:

Summary: Alternative Partitioner to Support "Always Round-Robin" assignment 
 (was: Client Partitioner - Round Robin)

> Alternative Partitioner to Support "Always Round-Robin" assignment
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Stephen Powis
>Priority: Major
>  Labels: needs-kip
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3333) Alternative Partitioner to Support "Always Round-Robin" partitioning

2019-05-20 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-:

Summary: Alternative Partitioner to Support "Always Round-Robin" 
partitioning  (was: Alternative Partitioner to Support "Always Round-Robin" 
assignment)

> Alternative Partitioner to Support "Always Round-Robin" partitioning
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Stephen Powis
>Priority: Major
>  Labels: needs-kip
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7358) Alternative Partitioner to Support "Always Round-Robin" Selection

2019-05-20 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16843951#comment-16843951
 ] 

M. Manna edited comment on KAFKA-7358 at 5/20/19 2:24 PM:
--

[~mjsax] Yes - looks like my pull request effort was not put in the correct 
place. 

I will resubmit my PR with  as the main ticket. and mark 7853 as the 
duplicate.


was (Author: manme...@gmail.com):
[~mjsax] Yes - looks like my pull request effort was not put in the correct 
place. Could we please consider 7358 for this since a KIP and dev work have 
been put into this?

> Alternative Partitioner to Support "Always Round-Robin" Selection
> -
>
> Key: KAFKA-7358
> URL: https://issues.apache.org/jira/browse/KAFKA-7358
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Minor
>
> In my organisation, we have been using kafka as the basic publish-subscribe 
> messaging system provider. Our goal is the send event-based (secure, 
> encrypted) SQL messages reliably, and process them accordingly. For us, the 
> message keys represent some metadata which we use to either ignore messages 
> (if a loopback to the sender), or log some information. We have the following 
> use case for messaging:
> 1) A Database transaction event takes place
> 2) The event is captured and messaged across 10 data centres all around the 
> world.
> 3) A group of consumers (for each data centre with a unique consumer-group 
> ID) are will process messages from their respective partitions. 1 consumer 
> per partition.
> Under the circumstances, we only need a guarantee that same message won't be 
> sent to multiple partitions. In other words, 1 partition will +never+ be 
> sought by multiple consumers.
> Using DefaultPartitioner, we can achieve this only with NULL keys. But since 
> we need keys for metadata, we cannot maintain "Round-robin" selection of 
> partitions because a key hash will determine which partition to choose. We 
> need to have round-robin style selection regardless of key type (NULL or 
> not-NULL)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7358) Alternative Partitioner to Support "Always Round-Robin" Selection

2019-05-20 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16843951#comment-16843951
 ] 

M. Manna commented on KAFKA-7358:
-

[~mjsax] Yes - looks like my pull request effort was not put in the correct 
place. Could we please consider 7358 for this since a KIP and dev work have 
been put into this?

> Alternative Partitioner to Support "Always Round-Robin" Selection
> -
>
> Key: KAFKA-7358
> URL: https://issues.apache.org/jira/browse/KAFKA-7358
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Minor
>
> In my organisation, we have been using kafka as the basic publish-subscribe 
> messaging system provider. Our goal is the send event-based (secure, 
> encrypted) SQL messages reliably, and process them accordingly. For us, the 
> message keys represent some metadata which we use to either ignore messages 
> (if a loopback to the sender), or log some information. We have the following 
> use case for messaging:
> 1) A Database transaction event takes place
> 2) The event is captured and messaged across 10 data centres all around the 
> world.
> 3) A group of consumers (for each data centre with a unique consumer-group 
> ID) are will process messages from their respective partitions. 1 consumer 
> per partition.
> Under the circumstances, we only need a guarantee that same message won't be 
> sent to multiple partitions. In other words, 1 partition will +never+ be 
> sought by multiple consumers.
> Using DefaultPartitioner, we can achieve this only with NULL keys. But since 
> we need keys for metadata, we cannot maintain "Round-robin" selection of 
> partitions because a key hash will determine which partition to choose. We 
> need to have round-robin style selection regardless of key type (NULL or 
> not-NULL)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8394) Cannot Start a build with New Gradle Version

2019-05-20 Thread M. Manna (JIRA)
M. Manna created KAFKA-8394:
---

 Summary: Cannot Start a build with New Gradle Version
 Key: KAFKA-8394
 URL: https://issues.apache.org/jira/browse/KAFKA-8394
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 2.2.0
Reporter: M. Manna


When I downloaded gradle 5.4.1 and ran `gradle wrapper` - the build failed 
because the scoverage plugin dependency encountered some build errors. The 
following is the output

 

org.gradle.api.GradleScriptException: A problem occurred evaluating root
 project 'kafka'.
         at
 
org.gradle.groovy.scripts.internal.DefaultScriptRunnerFactory$ScriptRunnerImpl.run(DefaultScriptRunnerFactory.java:92)
         at
 
org.gradle.configuration.DefaultScriptPluginFactory$ScriptPluginImpl$2.run(DefaultScriptPluginFactory.java:221)
         at
 
org.gradle.configuration.ProjectScriptTarget.addConfiguration(ProjectScriptTarget.java:77)
         at
 
org.gradle.configuration.DefaultScriptPluginFactory$ScriptPluginImpl.apply(DefaultScriptPluginFactory.java:226)
         at
 
org.gradle.configuration.BuildOperationScriptPlugin$1$1.run(BuildOperationScriptPlugin.java:69)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:402)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:394)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor$1.execute(DefaultBuildOperationExecutor.java:165)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:250)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:158)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:92)
         at
 
org.gradle.internal.operations.DelegatingBuildOperationExecutor.run(DelegatingBuildOperationExecutor.java:31)
         at
 
org.gradle.configuration.BuildOperationScriptPlugin$1.execute(BuildOperationScriptPlugin.java:66)
         at
 
org.gradle.configuration.BuildOperationScriptPlugin$1.execute(BuildOperationScriptPlugin.java:63)
         at
 
org.gradle.configuration.internal.DefaultUserCodeApplicationContext.apply(DefaultUserCodeApplicationContext.java:48)
         at
 
org.gradle.configuration.BuildOperationScriptPlugin.apply(BuildOperationScriptPlugin.java:63)
         at
 
org.gradle.configuration.project.BuildScriptProcessor$1.run(BuildScriptProcessor.java:44)
         at org.gradle.internal.Factories$1.create(Factories.java:25)
         at
 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:200)
         at
 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:186)
         at
 
org.gradle.configuration.project.BuildScriptProcessor.execute(BuildScriptProcessor.java:41)
         at
 
org.gradle.configuration.project.BuildScriptProcessor.execute(BuildScriptProcessor.java:26)
         at
 
org.gradle.configuration.project.ConfigureActionsProjectEvaluator.evaluate(ConfigureActionsProjectEvaluator.java:34)
         at
 
org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject$1.run(LifecycleProjectEvaluator.java:106)
         at org.gradle.internal.Factories$1.create(Factories.java:25)
         at
 
[org.gradle.internal.work|http://org.gradle.internal.work/].DefaultWorkerLeaseService.withLocks(DefaultWorkerLeaseService.java:183)
         at
 
[org.gradle.internal.work|http://org.gradle.internal.work/].StopShieldingWorkerLeaseService.withLocks(StopShieldingWorkerLeaseService.java:40)
         at
 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withProjectLock(DefaultProjectStateRegistry.java:226)
         at
 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:220)
         at
 
org.gradle.api.internal.project.DefaultProjectStateRegistry$ProjectStateImpl.withMutableState(DefaultProjectStateRegistry.java:186)
         at
 
org.gradle.configuration.project.LifecycleProjectEvaluator$EvaluateProject.run(LifecycleProjectEvaluator.java:95)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:402)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:394)
         at
 
org.gradle.internal.operations.DefaultBuildOperationExecutor$1.execute(DefaultBuildOperationExecutor.java:165)
         at
 
org.gradle.internal.operations.DefaultBuildOperatio

[jira] [Resolved] (KAFKA-7881) Inter.broker.procol.version is incorrect for Rolling Upgrade

2019-01-29 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna resolved KAFKA-7881.
-
Resolution: Cannot Reproduce
  Assignee: M. Manna

Duplicate jars were present - a delete and copy of new jars resolved the issue. 
closing this.

> Inter.broker.procol.version is incorrect for Rolling Upgrade
> 
>
> Key: KAFKA-7881
> URL: https://issues.apache.org/jira/browse/KAFKA-7881
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.1.0
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Critical
>   Original Estimate: 2m
>  Remaining Estimate: 2m
>
> We are getting the following error when upgrading from 1.1.0 to 2.1.0. We 
> have not changed the log message format version.
>  
> [https://kafka.apache.org/21/documentation.html]
>  
> Jan 29 06:06:14 one-drive-loc-01 systemd[1]: Started Zookeeper unit for this 
> machine.
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
> 06:06:15,272] INFO Registered kafka:type=kafka.Log4jController MBean 
> (kafka.utils.Log4jControl
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
> 06:06:15,599] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: 
> java.lang.IllegalArgumentException: Version `2.1` is not a valid version
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> scala.collection.AbstractMap.getOrElse(Map.scala:59)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.api.ApiVersion$.apply(ApiVersion.scala:88)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.server.KafkaConfig.(KafkaConfig.scala:1127)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.server.KafkaConfig.(KafkaConfig.scala:989)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:969)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.Kafka$.main(Kafka.scala:82)
>  Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
> kafka.Kafka.main(Kafka.scala)
>  
> we have also tried to make it to 2.1 but no change. If the version map is 
> being keyed using shortVersion, shouldn't it match? This is the first time we 
> are upgrading (from 1.1.0) and we have never had to change log message format.
> Please advise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7881) Inter.broker.procol.version is incorrect for Rolling Upgrade

2019-01-29 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-7881:

Description: 
We are getting the following error when upgrading from 1.1.0 to 2.1.0. We have 
not changed the log message format version.

 

[https://kafka.apache.org/21/documentation.html]

 

Jan 29 06:06:14 one-drive-loc-01 systemd[1]: Started Zookeeper unit for this 
machine.
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
06:06:15,272] INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControl
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
06:06:15,599] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: 
java.lang.IllegalArgumentException: Version `2.1` is not a valid version
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
scala.collection.AbstractMap.getOrElse(Map.scala:59)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$.apply(ApiVersion.scala:88)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig.(KafkaConfig.scala:1127)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig.(KafkaConfig.scala:989)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:969)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.Kafka$.main(Kafka.scala:82)
 Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.Kafka.main(Kafka.scala)

 

we have also tried to make it to 2.1 but no change. If the version map is being 
keyed using shortVersion, shouldn't it match? This is the first time we are 
upgrading (from 1.1.0) and we have never had to change log message format.

Please advise.

  was:
We are getting the following error when upgrading from 1.1.0 to 2.1.0. We have 
not changed the log message format version.

 

Jan 29 06:06:14 one-drive-loc-01 systemd[1]: Started Zookeeper unit for this 
machine.
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
06:06:15,272] INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControl
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
06:06:15,599] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: 
java.lang.IllegalArgumentException: Version `2.1` is not a valid version
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
scala.collection.AbstractMap.getOrElse(Map.scala:59)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$.apply(ApiVersion.scala:88)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig.(KafkaConfig.scala:1127)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig.(KafkaConfig.scala:989)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:969)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.Kafka$.main(Kafka.scala:82)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.Kafka.main(Kafka.scala)

 

we have also tried to make it to 2.1 but no change. If the version map is being 
keyed using shortVersion, shouldn't it match? This is the first time we are 
upgrading (from 1.1.0) and we have never had to change log message format. 

Please advise.


> Inter.broker.procol.version is inc

[jira] [Created] (KAFKA-7881) Inter.broker.procol.version is incorrect for Rolling Upgrade

2019-01-29 Thread M. Manna (JIRA)
M. Manna created KAFKA-7881:
---

 Summary: Inter.broker.procol.version is incorrect for Rolling 
Upgrade
 Key: KAFKA-7881
 URL: https://issues.apache.org/jira/browse/KAFKA-7881
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 2.1.0
Reporter: M. Manna


We are getting the following error when upgrading from 1.1.0 to 2.1.0. We have 
not changed the log message format version.

 

Jan 29 06:06:14 one-drive-loc-01 systemd[1]: Started Zookeeper unit for this 
machine.
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
06:06:15,272] INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControl
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: [2019-01-29 
06:06:15,599] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: 
java.lang.IllegalArgumentException: Version `2.1` is not a valid version
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:88)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
scala.collection.AbstractMap.getOrElse(Map.scala:59)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.api.ApiVersion$.apply(ApiVersion.scala:88)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig.(KafkaConfig.scala:1127)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig.(KafkaConfig.scala:989)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:969)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.Kafka$.main(Kafka.scala:82)
Jan 29 06:06:15 one-drive-loc-01 kafka-server-start.sh[15881]: at 
kafka.Kafka.main(Kafka.scala)

 

we have also tried to make it to 2.1 but no change. If the version map is being 
keyed using shortVersion, shouldn't it match? This is the first time we are 
upgrading (from 1.1.0) and we have never had to change log message format. 

Please advise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688008#comment-16688008
 ] 

M. Manna commented on KAFKA-1194:
-

Just out of curiosity,  have you considered dockerising your Kafka service,
or does it not work for your business? Or perhaps, moving to Linux server?




> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-24 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625600#comment-16625600
 ] 

M. Manna edited comment on KAFKA-1194 at 9/24/18 10:11 AM:
---

The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
 2) Disable log.cleaner.enable (i.e. false) - This should ensure that you don't 
have Log.scala segments opened accidentally by the brokers. In fact, once the 
cleaner is disabled, and the segments are rolled over - we should be able to 
delete them manually.
 3) Allow rollover of new log files.
 4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,


was (Author: manme...@gmail.com):
The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
2) Disable log.cleaner.enable (i.e. false)
3) Allow rollover of new log files.
4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecut

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-24 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625600#comment-16625600
 ] 

M. Manna commented on KAFKA-1194:
-

The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
2) Disable log.cleaner.enable (i.e. false)
3) Allow rollover of new log files.
4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 12:06 PM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already }}{{rebalanced and 
assigned the partitions to another member. This means that the time }}{{between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
}}{{which typically implies that the poll loop is spending too much time 
message processing. }}{{You can address this either by increasing the session 
timeout or by reducing the maximum }}{{size of batches returned in poll() with 
max.poll.records.}}_

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

 

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 12:06 PM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already rebalanced and 
assigned the partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
which typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in poll() with 
max.poll.records.}}_

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

 

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{Also, please check the description of CommitFailedException which might help 
you understand this issue.}}

_{{Commit cannot be completed since the group has already }}{{rebalanced and 
assigned the partitions to another member. This means that the time }}{{between 
subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
}}{{which typically implies that the poll loop is spending too much time 
message processing. }}{{You can address this either by increasing the session 
timeout or by reducing the maximum }}{{size of batches returned in poll() with 
max.poll.records.}}_

 

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = t

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 12:04 PM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{// this case shouldn't usually happen because we only send one fetch at a 
time per partition,}}
 {{// but it might conceivably happen in some rare cases (such as partition 
leader changes).}}
 {{// we have to copy to a new list because the old one may be immutable}}
  {{List> newRecords = new ArrayList<>(records.size() + 
currentRecords.size());}}
  {{newRecords.addAll(currentRecords);}}
  {{newRecords.addAll(records);}}
 {{fetched.put(partition, newRecords);}}

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 30
> metric.reporters = []
> me

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 11:58 AM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{// this case shouldn't usually happen because we only send one fetch at a 
time per partition,}}
 {{// but it might conceivably happen in some rare cases (such as partition 
leader changes).}}
 {{// we have to copy to a new list because the old one may be immutable}}
  {{List> newRecords = new ArrayList<>(records.size() + 
currentRecords.size());}}
  {{newRecords.addAll(currentRecords);}}
  {{newRecords.addAll(records);}}
 {{fetched.put(partition, newRecords);}}


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

 

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.p

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 11:57 AM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

 


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{    // this case shouldn't usually happen because we 
only send one fetch at a time per partition,}}
{{    // but it might conceivably happen in some rare 
cases (such as partition leader changes).}}
{{    // we have to copy to a new list because the old 
one may be immutable}}
{{    List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());}}
{{    newRecords.addAll(currentRecords);}}
{{    newRecords.addAll(records);}}
{{    fetched.put(partition, newRecords);}}

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.de

[jira] [Comment Edited] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 11:55 AM:
---

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=2 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{    // this case shouldn't usually happen because we 
only send one fetch at a time per partition,}}
{{    // but it might conceivably happen in some rare 
cases (such as partition leader changes).}}
{{    // we have to copy to a new list because the old 
one may be immutable}}
{{    List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());}}
{{    newRecords.addAll(currentRecords);}}
{{    newRecords.addAll(records);}}
{{    fetched.put(partition, newRecords);}}


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

Do you know roughly how much delay (max) you need to process the message? e.g. 
20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and 
adjust session.timeout.ms to be ~3x more than that) and see if you are still 
encountering the issue? I might have overlooked something, but from the code I 
don't see any reason why this will be overridden (will check this anyway).

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 10485

[jira] [Commented] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-08-31 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna commented on KAFKA-7365:
-

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

Do you know roughly how much delay (max) you need to process the message? e.g. 
20s ? If so, do you want to set that in your heartbeat.interval.ms=2 (and 
adjust session.timeout.ms to be ~3x more than that) and see if you are still 
encountering the issue? I might have overlooked something, but from the code I 
don't see any reason why this will be overridden (will check this anyway).

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 3
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
> [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
> [request.timeout.ms|https://request.timeout.ms/] = 4
> [retry.backoff.ms|https://retry.backoff.ms/] = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> [session.timeout.ms|https://session.timeout.ms/] = 1
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = 
> /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks
> ssl.keystore.password = [hidden]
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-08-30 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597234#comment-16597234
 ] 

M. Manna commented on KAFKA-1194:
-

[~Kobi Hikri] Please check what I have mentioned with the relevant ticket. Yes 
you will always find this problem as the fix hasn't gone in yet.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-08-30 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597218#comment-16597218
 ] 

M. Manna commented on KAFKA-1194:
-

[~Kobi Hikri] I believe [7278|https://issues.apache.org/jira/browse/KAFKA-7278] 
has already, and "Potentially" fixed the issue. 

[~lindong] You want to elaborate whether that's the case.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7358) Alternative Partitioner to Support "Always Round-Robin" Selection

2018-08-30 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-7358:

Summary: Alternative Partitioner to Support "Always Round-Robin" Selection  
(was: Concrete Partitioner to Support "Always Round-Robin" Selection)

> Alternative Partitioner to Support "Always Round-Robin" Selection
> -
>
> Key: KAFKA-7358
> URL: https://issues.apache.org/jira/browse/KAFKA-7358
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Minor
>
> In my organisation, we have been using kafka as the basic publish-subscribe 
> messaging system provider. Our goal is the send event-based (secure, 
> encrypted) SQL messages reliably, and process them accordingly. For us, the 
> message keys represent some metadata which we use to either ignore messages 
> (if a loopback to the sender), or log some information. We have the following 
> use case for messaging:
> 1) A Database transaction event takes place
> 2) The event is captured and messaged across 10 data centres all around the 
> world.
> 3) A group of consumers (for each data centre with a unique consumer-group 
> ID) are will process messages from their respective partitions. 1 consumer 
> per partition.
> Under the circumstances, we only need a guarantee that same message won't be 
> sent to multiple partitions. In other words, 1 partition will +never+ be 
> sought by multiple consumers.
> Using DefaultPartitioner, we can achieve this only with NULL keys. But since 
> we need keys for metadata, we cannot maintain "Round-robin" selection of 
> partitions because a key hash will determine which partition to choose. We 
> need to have round-robin style selection regardless of key type (NULL or 
> not-NULL)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7358) Concrete Partitioner to Support "Always Round-Robin" Selection

2018-08-30 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-7358:

Summary: Concrete Partitioner to Support "Always Round-Robin" Selection  
(was: Extended Partitioner to Support "Always Round-Robin" Selection)

> Concrete Partitioner to Support "Always Round-Robin" Selection
> --
>
> Key: KAFKA-7358
> URL: https://issues.apache.org/jira/browse/KAFKA-7358
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Minor
>
> In my organisation, we have been using kafka as the basic publish-subscribe 
> messaging system provider. Our goal is the send event-based (secure, 
> encrypted) SQL messages reliably, and process them accordingly. For us, the 
> message keys represent some metadata which we use to either ignore messages 
> (if a loopback to the sender), or log some information. We have the following 
> use case for messaging:
> 1) A Database transaction event takes place
> 2) The event is captured and messaged across 10 data centres all around the 
> world.
> 3) A group of consumers (for each data centre with a unique consumer-group 
> ID) are will process messages from their respective partitions. 1 consumer 
> per partition.
> Under the circumstances, we only need a guarantee that same message won't be 
> sent to multiple partitions. In other words, 1 partition will +never+ be 
> sought by multiple consumers.
> Using DefaultPartitioner, we can achieve this only with NULL keys. But since 
> we need keys for metadata, we cannot maintain "Round-robin" selection of 
> partitions because a key hash will determine which partition to choose. We 
> need to have round-robin style selection regardless of key type (NULL or 
> not-NULL)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7358) Extended Partitioner to Support "Always Round-Robin" Selection

2018-08-30 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-7358:

Summary: Extended Partitioner to Support "Always Round-Robin" Selection  
(was: Extended Partitioner to Support "Always Round Robin" Selection)

> Extended Partitioner to Support "Always Round-Robin" Selection
> --
>
> Key: KAFKA-7358
> URL: https://issues.apache.org/jira/browse/KAFKA-7358
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Minor
>
> In my organisation, we have been using kafka as the basic publish-subscribe 
> messaging system provider. Our goal is the send event-based (secure, 
> encrypted) SQL messages reliably, and process them accordingly. For us, the 
> message keys represent some metadata which we use to either ignore messages 
> (if a loopback to the sender), or log some information. We have the following 
> use case for messaging:
> 1) A Database transaction event takes place
> 2) The event is captured and messaged across 10 data centres all around the 
> world.
> 3) A group of consumers (for each data centre with a unique consumer-group 
> ID) are will process messages from their respective partitions. 1 consumer 
> per partition.
> Under the circumstances, we only need a guarantee that same message won't be 
> sent to multiple partitions. In other words, 1 partition will +never+ be 
> sought by multiple consumers.
> Using DefaultPartitioner, we can achieve this only with NULL keys. But since 
> we need keys for metadata, we cannot maintain "Round-robin" selection of 
> partitions because a key hash will determine which partition to choose. We 
> need to have round-robin style selection regardless of key type (NULL or 
> not-NULL)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7358) Extended Partitioner to Support "Always Round Robin" Selection

2018-08-30 Thread M. Manna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-7358:

Description: 
In my organisation, we have been using kafka as the basic publish-subscribe 
messaging system provider. Our goal is the send event-based (secure, encrypted) 
SQL messages reliably, and process them accordingly. For us, the message keys 
represent some metadata which we use to either ignore messages (if a loopback 
to the sender), or log some information. We have the following use case for 
messaging:

1) A Database transaction event takes place

2) The event is captured and messaged across 10 data centres all around the 
world.

3) A group of consumers (for each data centre with a unique consumer-group ID) 
are will process messages from their respective partitions. 1 consumer per 
partition.

Under the circumstances, we only need a guarantee that same message won't be 
sent to multiple partitions. In other words, 1 partition will +never+ be sought 
by multiple consumers.

Using DefaultPartitioner, we can achieve this only with NULL keys. But since we 
need keys for metadata, we cannot maintain "Round-robin" selection of 
partitions because a key hash will determine which partition to choose. We need 
to have round-robin style selection regardless of key type (NULL or not-NULL)

  was:
In my organisation, we have been using kafka as the basic publish-subscribe 
messaging system provider. Our goal is the send event-based messages reliably 
and securely, and perform data synchronisation based on the messages. For us, 
the message keys represent some metadata which we use to either ignore messages 
(if a loopback) or log some information. We have the following use case for 
messaging:

1) A Database transaction event takes place

2) The event is captured and messaged across 10 data centres all around the 
world.

3) A group of consumers (for each data centre with a unique consumer-group ID) 
are will process messages from their respective partitions. 1 consumer per 
partition.

Under the circumstances, we only need a guarantee that same message won't be 
sent to multiple partitions. Using DefaultPartitioner, we can achieve this only 
with NULL keys. But since we need keys for metadata, we cannot maintain 
"Round-robin" selection of partitions because a key hash will determine which 
partition to choose. We need both non-null key with round-robin partition 
selection for KafkaProducer.

We believe this solution is achievable and stable, because multiple consumer 
will not be consuming from the same partition as per Kafka's fundamental 
architecture. Hence, the ask for an extended partitioner.


> Extended Partitioner to Support "Always Round Robin" Selection
> --
>
> Key: KAFKA-7358
> URL: https://issues.apache.org/jira/browse/KAFKA-7358
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Minor
>
> In my organisation, we have been using kafka as the basic publish-subscribe 
> messaging system provider. Our goal is the send event-based (secure, 
> encrypted) SQL messages reliably, and process them accordingly. For us, the 
> message keys represent some metadata which we use to either ignore messages 
> (if a loopback to the sender), or log some information. We have the following 
> use case for messaging:
> 1) A Database transaction event takes place
> 2) The event is captured and messaged across 10 data centres all around the 
> world.
> 3) A group of consumers (for each data centre with a unique consumer-group 
> ID) are will process messages from their respective partitions. 1 consumer 
> per partition.
> Under the circumstances, we only need a guarantee that same message won't be 
> sent to multiple partitions. In other words, 1 partition will +never+ be 
> sought by multiple consumers.
> Using DefaultPartitioner, we can achieve this only with NULL keys. But since 
> we need keys for metadata, we cannot maintain "Round-robin" selection of 
> partitions because a key hash will determine which partition to choose. We 
> need to have round-robin style selection regardless of key type (NULL or 
> not-NULL)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7358) Extended Partitioner to Support "Always Round Robin" Selection

2018-08-30 Thread M. Manna (JIRA)
M. Manna created KAFKA-7358:
---

 Summary: Extended Partitioner to Support "Always Round Robin" 
Selection
 Key: KAFKA-7358
 URL: https://issues.apache.org/jira/browse/KAFKA-7358
 Project: Kafka
  Issue Type: Wish
  Components: clients
Reporter: M. Manna
Assignee: M. Manna


In my organisation, we have been using kafka as the basic publish-subscribe 
messaging system provider. Our goal is the send event-based messages reliably 
and securely, and perform data synchronisation based on the messages. For us, 
the message keys represent some metadata which we use to either ignore messages 
(if a loopback) or log some information. We have the following use case for 
messaging:

1) A Database transaction event takes place

2) The event is captured and messaged across 10 data centres all around the 
world.

3) A group of consumers (for each data centre with a unique consumer-group ID) 
are will process messages from their respective partitions. 1 consumer per 
partition.

Under the circumstances, we only need a guarantee that same message won't be 
sent to multiple partitions. Using DefaultPartitioner, we can achieve this only 
with NULL keys. But since we need keys for metadata, we cannot maintain 
"Round-robin" selection of partitions because a key hash will determine which 
partition to choose. We need both non-null key with round-robin partition 
selection for KafkaProducer.

We believe this solution is achievable and stable, because multiple consumer 
will not be consuming from the same partition as per Kafka's fundamental 
architecture. Hence, the ask for an extended partitioner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-08-15 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580836#comment-16580836
 ] 

M. Manna commented on KAFKA-1194:
-

[~haraldk] not sure about this jdk bug The issue happens mainly on 
WindowsLinux works just fine and also in containers Recently [~lindong] 
has identified a possible bug where asyc delete shouldn't be called on certain 
segments. I will look forward to that patch

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7160) Add check for group ID length

2018-08-11 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577256#comment-16577256
 ] 

M. Manna commented on KAFKA-7160:
-

[~lambdaliu] I would like to take this if I may? Also, does anyone know whether 
256 Bytes should be a good check?

> Add check for group ID length
> -
>
> Key: KAFKA-7160
> URL: https://issues.apache.org/jira/browse/KAFKA-7160
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: lambdaliu
>Priority: Minor
>  Labels: newbie
>
> We should limit the length of the group ID, because other system(such as 
> monitor system) would use the group ID when we using  kafka in production 
> environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-11 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577250#comment-16577250
 ] 

M. Manna commented on KAFKA-6188:
-

[~lindong] thanks for getting back. I believe what we need to do is update the 
documentation. The fact that Kafka is not guaranteed to work on Windows as 
reliably as Linux begs the clarification on official page. I believe 
organisations are slow leaning towards Containerised deployment model (i.e. 
Linux). So, I don't see why we can't just update the official doc permanently. 
At the least, some warning with this file system exception issue. Would you not 
agree?

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete

2018-08-10 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16575873#comment-16575873
 ] 

M. Manna edited comment on KAFKA-6701 at 8/10/18 7:30 AM:
--

[~lindong] I believe this might also be a fix for 
https://issues.apache.org/jira/browse/KAFKA-6188 . But the issue still occurs 
on Windows as of 2.11-1.1.0 release.


was (Author: manme...@gmail.com):
[~lindong] I believe this might also be a fix for 
https://issues.apache.org/jira/browse/KAFKA-6188 .

> synchronize Log modification between delete cleanup and async delete
> 
>
> Key: KAFKA-6701
> URL: https://issues.apache.org/jira/browse/KAFKA-6701
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
>
> Kafka broker crashes without any evident disk failures 
> From [~becket_qin]: This looks a bug in kafka when topic deletion and log 
> retention cleanup happen at the same time, the log retention cleanup may see 
> ClosedChannelException after the log has been renamed for async deletion.
> The root cause is that the topic deletion should have set the isClosed flag 
> of the partition log to true and the retention should not bother to do the 
> old log segments deletion when the log is closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete

2018-08-10 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16575873#comment-16575873
 ] 

M. Manna commented on KAFKA-6701:
-

[~lindong] I believe this might also be a fix for 
https://issues.apache.org/jira/browse/KAFKA-6188 .

> synchronize Log modification between delete cleanup and async delete
> 
>
> Key: KAFKA-6701
> URL: https://issues.apache.org/jira/browse/KAFKA-6701
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
>
> Kafka broker crashes without any evident disk failures 
> From [~becket_qin]: This looks a bug in kafka when topic deletion and log 
> retention cleanup happen at the same time, the log retention cleanup may see 
> ClosedChannelException after the log has been renamed for async deletion.
> The root cause is that the topic deletion should have set the isClosed flag 
> of the partition log to true and the retention should not bother to do the 
> old log segments deletion when the log is closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-22 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-6188:

Attachment: Segments are opened before deletion

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-21 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16482760#comment-16482760
 ] 

M. Manna commented on KAFKA-6188:
-

[~lindong]  Hey Dong. Thanks for your valuable comments on the issue. I don't 
believe we can say it's a Disk issue. This has occurred in the past and with 
currently exists in 1.1.0. And I have tried it on a clean machine with a fresh 
Hard Disk. The problem persists without any disk issues.

The bug is not because broker tries to access a file after the file has been 
deleted, but because of old segment kept open whilst it's being renamed - which 
can cause problems on any OS platform.

The below is the javadoc for Log#replaceSegments method:

 

{{   * The sequence of operations is:}}
{{   * }}
{{   *    Cleaner creates new segment with suffix .cleaned and invokes 
replaceSegments().}}
{{   *    If broker crashes at this point, the clean-and-swap operation is 
aborted and}}
{{   *    the .cleaned file is deleted on recovery in loadSegments()}}
{{   *    New segment is renamed .swap. If the broker crashes after this 
point before the whole}}
{{   *    operation is completed, the swap operation is resumed on recovery 
as described in the next step.}}
{{   *    Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.}}
{{   *    If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().}}
{{   *    replaceSegments() is then invoked to complete the swap with 
newSegment recreated from}}
{{   *    the .swap file and oldSegments containing segments which were not 
renamed before the crash.}}
{{   *    Swap segment is renamed to replace the existing segment, 
completing this operation.}}
{{   *    If the broker crashes, any .deleted files which may be left 
behind are deleted}}
{{   *    on recovery in loadSegments().}}
{{   * }}

Before calling asycDeleteSegment(seg) - old segments are still open and this 
will cause a FIleSystemException. Also, before calling the following line, new 
segments are also open so it will cause problems.

{{  // okay we are safe now, remove the swap suffix}}
{{  newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")}}

Essentially, we are attempting to delete/rename a file which is already open - 
I believe this will corrupt things regardless of Windows/Linux/Mac platforms.

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetch

[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480686#comment-16480686
 ] 

M. Manna edited comment on KAFKA-6188 at 5/18/18 2:01 PM:
--

[~yuzhih...@gmail.com] I am constantly hitting blocker at this location. There 
are two FATAL shutdowns and this one is causing issues during startup 
(LogManager):

 

{{ // dir should be an absolute path}}
 {{  def handleLogDirFailure(dir: String) {}}
 {{    info(s"Stopping serving logs in dir $dir")}}
 {{    logCreationOrDeletionLock synchronized {}}
 {{  _liveLogDirs.remove(new File(dir))}}
 {{  if (_liveLogDirs.isEmpty) {}}
 {{    fatal(s"Shutdown broker because all log dirs in 
${logDirs.mkString(", ")} have failed")}}
 {{    Exit.halt(1)}}
 {

{  }

}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and log segments/offsets will keep growing and violating the 
scheduled retention/cleanup policy ?


 Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 


was (Author: manme...@gmail.com):
[~yuzhih...@gmail.com] I am constantly hitting blocker at this location. There 
are two FATAL shutdowns and this one is causing issues during startup 
(LogManager):

 

{{ // dir should be an absolute path}}
 {{  def handleLogDirFailure(dir: String) {}}
 {{    info(s"Stopping serving logs in dir $dir")}}
 {{    logCreationOrDeletionLock synchronized {}}
 {{  _liveLogDirs.remove(new File(dir))}}
 {{  if (_liveLogDirs.isEmpty) {}}
 {{    fatal(s"Shutdown broker because all log dirs in 
${logDirs.mkString(", ")} have failed")}}
 {{    Exit.halt(1)}}
 \{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
 Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broke

[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480686#comment-16480686
 ] 

M. Manna edited comment on KAFKA-6188 at 5/18/18 2:00 PM:
--

[~yuzhih...@gmail.com] I am constantly hitting blocker at this location. There 
are two FATAL shutdowns and this one is causing issues during startup 
(LogManager):

 

{{ // dir should be an absolute path}}
 {{  def handleLogDirFailure(dir: String) {}}
 {{    info(s"Stopping serving logs in dir $dir")}}
 {{    logCreationOrDeletionLock synchronized {}}
 {{  _liveLogDirs.remove(new File(dir))}}
 {{  if (_liveLogDirs.isEmpty) {}}
 {{    fatal(s"Shutdown broker because all log dirs in 
${logDirs.mkString(", ")} have failed")}}
 {{    Exit.halt(1)}}
 \{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
 Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 


was (Author: manme...@gmail.com):
[~yuzhih...@gmail.com] I am constantly having issues here. There are 
potentially two shutdowns and this one seems to be causing issues for FATAL 
shutdown (LogManager):

 

{{ // dir should be an absolute path}}
{{  def handleLogDirFailure(dir: String) {}}
{{    info(s"Stopping serving logs in dir $dir")}}
{{    logCreationOrDeletionLock synchronized {}}
{{  _liveLogDirs.remove(new File(dir))}}
{{  if (_liveLogDirs.isEmpty) {}}
{{    fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", 
")} have failed")}}
{{    Exit.halt(1)}}
{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\ka

[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480686#comment-16480686
 ] 

M. Manna commented on KAFKA-6188:
-

[~yuzhih...@gmail.com] I am constantly having issues here. There are 
potentially two shutdowns and this one seems to be causing issues for FATAL 
shutdown (LogManager):

 

{{ // dir should be an absolute path}}
{{  def handleLogDirFailure(dir: String) {}}
{{    info(s"Stopping serving logs in dir $dir")}}
{{    logCreationOrDeletionLock synchronized {}}
{{  _liveLogDirs.remove(new File(dir))}}
{{  if (_liveLogDirs.isEmpty) {}}
{{    fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", 
")} have failed")}}
{{    Exit.halt(1)}}
{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-16 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477235#comment-16477235
 ] 

M. Manna commented on KAFKA-6188:
-

In 0.10.2.1 - to address the problem I raised a pull request a while back. Here 
it is[^kafka_2.10-0.10.2.1.zip]- [https://github.com/apache/kafka/pull/3838]

Since everyone was too focused on Linux, I went ahead and deployed our 
production staging testbed with 0.10.2.1 with the modified core jar, and guess 
what? it ran perpetually without running into any issues and cleaned logs/index 
etc. as you expect it to do. For convenience, I have attached that kafka distro 
with all configs (which, if the above PR was approved, would solve all issues). 
Please check this and see what I mean.

When moved into 1.1.0 - everything broke with/without the same change. The 
hemorrhaging is caused by LogManager fatal shutdown (code below):

{{}}{{logCreationOrDeletionLock synchronized {}}
{{  _liveLogDirs.remove(new File(dir))}}
{{  if (_liveLogDirs.isEmpty) {}}
{{    fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", 
")} have failed")}}
{{    Exit.halt(1)}}
{{  }}}

I am yet to diff the 0.10.2.1 code and find out more, long way to go. If anyone 
has any valuable insight, it would be good to have them. [~yuzhih...@gmail.com] 
, [~TeilaRei] et all Thanks for your ideas.

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-16 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-6188:

Attachment: kafka_2.10-0.10.2.1.zip

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-15 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-6188:

Attachment: output.txt

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-15 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476061#comment-16476061
 ] 

M. Manna edited comment on KAFKA-6188 at 5/15/18 4:01 PM:
--

[~TeilaRei] and [~darion] I am not sure if this version makes any difference. 
'inter.broker.protocol.version' is default to the correction version 1.1-IV0 
which means for 1.1.0 it shouldn't halt. I have tried it with fresh 3 node and 
1 node cluster using OOB setup. My log and offset cleanup sizes were small and 
retention period was also small to trigger a quick test. It still breaks on 
Windows.


 The stack trace which [~yuzhih...@gmail.com] and myself have investigated 
shows that this will almost certainly happen if the segment file channels are 
open/closed at the same time cleaner thread is trying to clean/read it.
 Closing the log and index files didn't help - When you start the broker it 
cleans the files nicely, but the problem arises when expired offsets are being 
cleaned using LogCleaner$CleanerThread. I hope this helps.


was (Author: manme...@gmail.com):
[~TeilaRei] and [~darion] I am not sure if this version makes any difference. 
'inter.broker.protocol.version' is default to the correction version 1.1-IV0 
which means for 1.1.0 it shouldn't halt. 
The stack trace which [~yuzhih...@gmail.com] and myself have investigated shows 
that this will almost certainly happen if the segment file channels are 
open/closed at the same time cleaner thread is trying to clean/read it.
Closing the log and index files didn't help - When you start the broker it 
cleans the files nicely, but the problem arises when expired offsets are being 
cleaned using LogCleaner$CleanerThread. I hope this helps.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-15 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476061#comment-16476061
 ] 

M. Manna commented on KAFKA-6188:
-

[~TeilaRei] and [~darion] I am not sure if this version makes any difference. 
'inter.broker.protocol.version' is default to the correction version 1.1-IV0 
which means for 1.1.0 it shouldn't halt. 
The stack trace which [~yuzhih...@gmail.com] and myself have investigated shows 
that this will almost certainly happen if the segment file channels are 
open/closed at the same time cleaner thread is trying to clean/read it.
Closing the log and index files didn't help - When you start the broker it 
cleans the files nicely, but the problem arises when expired offsets are being 
cleaned using LogCleaner$CleanerThread. I hope this helps.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-14 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473963#comment-16473963
 ] 

M. Manna commented on KAFKA-6188:
-

As per Email exchange with [~yuzhih...@gmail.com], I am adding the link since 
it's all related to Windows.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-04-30 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458526#comment-16458526
 ] 

M. Manna commented on KAFKA-1194:
-

I will adding a patch to the trunk and raise a PR. In the past I have raised a 
PR and all existing test should have just passed. There are some bizzare JDK 
compatibility tests which fails for unrelated reasons. May be someone from 
permanent members should review the PR.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-13 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085330#comment-16085330
 ] 

M. Manna commented on KAFKA-5585:
-

@huxih  there is only one partition it seems.

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: broker_zookeeper_configs.zip, SimpleConsumer.java, 
> SimpleProducer.java, Stress Test Windows.xlsx, test_project_files.zip
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files
> *Log Consumer:*
> 2017-07-12 16:15:26 WARN  ConsumerCoordinator:649 - Auto-commit of offsets 
> {produktion-0=OffsetAndMetadata{offset=10, metadata=''}} failed for group a: 
> Offset commit failed with a retriable exception. You should retry committing 
> offsets. The underlying error was: The coordinator is not available.
> 2017-07-12 16:15:26 WARN  NetworkClient:588 - Connection to node 2147483645 
> could not be established. Broker may not be available.
> 2017-07-12 16:15:26 WARN  NetworkClient:588 - Connection to node 2 could not 
> be established. Broker may not be available.
> *Log Producer:*
> {{2017-07-12 16:15:32 WARN  NetworkClient:588 - Connection to node -1 could 
> not be established. Broker may not be available.}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-12 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083991#comment-16083991
 ] 

M. Manna edited comment on KAFKA-5585 at 7/12/17 2:05 PM:
--

Jinx or not, I was doing some PoC test on reliability on Windows. Somewhat 
similar issues to what's been reported here (i guess however, my linger.ms and 
backoff.ms need to be adjusted properly)

Two topics *z1* (partitions 1 repfactor 3) and *z3* (partitions 3 repfactor 3) 
should be created beforehand. Attached files are
1) Source code for my stress testing - Need to create a project with *Log4j *- 
test_project_files.zip
2) Test results excel sheet - Stress Test Windows
3) Cluster properties file - broker_zookeeper_configs.zip


was (Author: manme...@gmail.com):
Jinx or not, I was doing some PoC test on reliability on Windows. Somewhat 
similar issues to what's been reported here (i guess however, my linger.ms and 
backoff.ms need to be adjusted properly)

Two topics *z1* (partitions 1 repfactor 3) and *z3* (partitions 3 repfactor 3) 
should be created beforehand. Attached files are
1) Source code for my stress testing - Need to create a project with *Log4j *- 
test_project_files.zip
2) Test results excel sheet - Stress Test Windows

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: broker_zookeeper_configs.zip, SimpleConsumer.java, 
> SimpleProducer.java, Stress Test Windows.xlsx, test_project_files.zip
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-12 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5585:

Attachment: broker_zookeeper_configs.zip

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: broker_zookeeper_configs.zip, SimpleConsumer.java, 
> SimpleProducer.java, Stress Test Windows.xlsx, test_project_files.zip
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-12 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5585:

Attachment: Stress Test Windows.xlsx

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: SimpleConsumer.java, SimpleProducer.java, Stress Test 
> Windows.xlsx, test_project_files.zip
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-12 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5585:

Attachment: test_project_files.zip

Need to create a project with log4j dependency

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: SimpleConsumer.java, SimpleProducer.java, 
> test_project_files.zip
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-12 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083991#comment-16083991
 ] 

M. Manna edited comment on KAFKA-5585 at 7/12/17 1:44 PM:
--

Jinx or not, I was doing some PoC test on reliability on Windows. Somewhat 
similar issues to what's been reported here (i guess however, my linger.ms and 
backoff.ms need to be adjusted properly)

Two topics *z1* (partitions 1 repfactor 3) and *z3* (partitions 3 repfactor 3) 
should be created beforehand. Attached files are
1) Source code for my stress testing - Need to create a project with *Log4j *- 
test_project_files.zip
2) Test results excel sheet - Stress Test Windows


was (Author: manme...@gmail.com):
Jinx or not, I was doing some PoC test on reliability on Windows. Somewhat 
similar issues to what's been reported here (i guess however, my linger.ms and 
backoff.ms need to be adjusted properly)

Two topics *z1* (partitions 1 repfactor 3) and *z3* (partitions 3 repfactor 3) 
should be created beforehand. Attached files are
1) Source code for my stress testing - Need to create a project with *Log4j *- 
test_project_files.zip
2) Test results excel sheet.

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: SimpleConsumer.java, SimpleProducer.java, Stress Test 
> Windows.xlsx, test_project_files.zip
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-12 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5585:

Comment: was deleted

(was: Need to create a project with log4j dependency)

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: SimpleConsumer.java, SimpleProducer.java, Stress Test 
> Windows.xlsx, test_project_files.zip
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5585) Failover in a replicated Cluster does not work

2017-07-12 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083991#comment-16083991
 ] 

M. Manna commented on KAFKA-5585:
-

Jinx or not, I was doing some PoC test on reliability on Windows. Somewhat 
similar issues to what's been reported here (i guess however, my linger.ms and 
backoff.ms need to be adjusted properly)

Two topics *z1* (partitions 1 repfactor 3) and *z3* (partitions 3 repfactor 3) 
should be created beforehand. Attached files are
1) Source code for my stress testing - Need to create a project with *Log4j *- 
test_project_files.zip
2) Test results excel sheet.

> Failover in a replicated Cluster does not work
> --
>
> Key: KAFKA-5585
> URL: https://issues.apache.org/jira/browse/KAFKA-5585
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: Linux, Mac OSX
>Reporter: Thomas Bayer
> Attachments: SimpleConsumer.java, SimpleProducer.java
>
>
> Failover does not work in a cluster with 3 nodes and a replicated topic with 
> factor 3.
> You can reproduce it als follows: Setup 3 Kafka Nodes and 1 Zookeeper. Than 
> create a topic with factor 3. Start a consumer. Stop a node. Write to the 
> topic. Now you get warnings that the client can not connect to a broker. The 
> consumer does not receive any messages.
> The same setup works like a charm with 0.10.2.1.
> Broker Config:
> {{broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=cluster/logs/node-1
> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=cluster/logs/node-2
> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=cluster/logs/node-3}}
> Rest of the config is from the distribution.
> Producer and consumer config: see attached files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5583) Provide an "OS independent" file rename and delete mechanism

2017-07-11 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5583:

Description: 
This is related to 
[KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
Windows. I thought it would be better to address this as a KIP item since it's 
OS platform specific. Also, any moderators please feel free to move it to Cwiki 
as KIP proposal if necessary.

Some of the unit tests run on Windows platform fails after gradle build because 
of the same. Could I please request you to put some thought whether considering 
a platform-independent way of handling file renaming and deletion? 




  was:
This is related to 
[KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
Windows. I thought it would be better to address this as a KIP item since it's 
OS platform specific.

Quite a lot of unit tests run on Windows platform fails after gradle build 
because of the same. Could I please request you to put some thought whether 
considering a platform-independent way of handling file renaming and deletion? 





> Provide an "OS independent" file rename and delete mechanism
> 
>
> Key: KAFKA-5583
> URL: https://issues.apache.org/jira/browse/KAFKA-5583
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.10.2.1
> Environment: Windows
>Reporter: M. Manna
>
> This is related to 
> [KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
> Windows. I thought it would be better to address this as a KIP item since 
> it's OS platform specific. Also, any moderators please feel free to move it 
> to Cwiki as KIP proposal if necessary.
> Some of the unit tests run on Windows platform fails after gradle build 
> because of the same. Could I please request you to put some thought whether 
> considering a platform-independent way of handling file renaming and 
> deletion? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5583) Provide an "OS independent" file rename and delete mechanism

2017-07-11 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5583:

Attachment: att2.jpg
att3.jpg

> Provide an "OS independent" file rename and delete mechanism
> 
>
> Key: KAFKA-5583
> URL: https://issues.apache.org/jira/browse/KAFKA-5583
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.10.2.1
> Environment: Windows
>Reporter: M. Manna
> Attachments: att1.jpg, att2.jpg, att3.jpg
>
>
> This is related to 
> [KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
> Windows. I thought it would be better to address this as a KIP item since 
> it's OS platform specific. Also, any moderators please feel free to move it 
> to Cwiki as KIP proposal if necessary.
> Some of the unit tests run on Windows platform fails after gradle build 
> because of the same. Could I please request you to put some thought whether 
> considering a platform-independent way of handling file renaming and 
> deletion? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5583) Provide a "OS independent" file rename and delete mechanism

2017-07-11 Thread M. Manna (JIRA)
M. Manna created KAFKA-5583:
---

 Summary: Provide a "OS independent" file rename and delete 
mechanism
 Key: KAFKA-5583
 URL: https://issues.apache.org/jira/browse/KAFKA-5583
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Affects Versions: 0.10.2.1
 Environment: Windows
Reporter: M. Manna


This is related to 
[KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
Windows. I thought it would be better to address this as a KIP item since it's 
OS platform specific.

Quite a lot of unit tests run on Windows platform fails after gradle build 
because of the same. Could I please request you to put some thought whether 
considering a platform-independent way of handling file renaming and deletion? 






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5583) Provide an "OS independent" file rename and delete mechanism

2017-07-11 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5583:

Attachment: att1.jpg

> Provide an "OS independent" file rename and delete mechanism
> 
>
> Key: KAFKA-5583
> URL: https://issues.apache.org/jira/browse/KAFKA-5583
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.10.2.1
> Environment: Windows
>Reporter: M. Manna
> Attachments: att1.jpg
>
>
> This is related to 
> [KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
> Windows. I thought it would be better to address this as a KIP item since 
> it's OS platform specific. Also, any moderators please feel free to move it 
> to Cwiki as KIP proposal if necessary.
> Some of the unit tests run on Windows platform fails after gradle build 
> because of the same. Could I please request you to put some thought whether 
> considering a platform-independent way of handling file renaming and 
> deletion? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5583) Provide an "OS independent" file rename and delete mechanism

2017-07-11 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5583:

Summary: Provide an "OS independent" file rename and delete mechanism  
(was: Provide an "OS independent" file rename and delete mechanismn)

> Provide an "OS independent" file rename and delete mechanism
> 
>
> Key: KAFKA-5583
> URL: https://issues.apache.org/jira/browse/KAFKA-5583
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.10.2.1
> Environment: Windows
>Reporter: M. Manna
>
> This is related to 
> [KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
> Windows. I thought it would be better to address this as a KIP item since 
> it's OS platform specific.
> Quite a lot of unit tests run on Windows platform fails after gradle build 
> because of the same. Could I please request you to put some thought whether 
> considering a platform-independent way of handling file renaming and 
> deletion? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5583) Provide an "OS independent" file rename and delete mechanismn

2017-07-11 Thread M. Manna (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-5583:

Summary: Provide an "OS independent" file rename and delete mechanismn  
(was: Provide a "OS independent" file rename and delete mechanism)

> Provide an "OS independent" file rename and delete mechanismn
> -
>
> Key: KAFKA-5583
> URL: https://issues.apache.org/jira/browse/KAFKA-5583
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.10.2.1
> Environment: Windows
>Reporter: M. Manna
>
> This is related to 
> [KAFKA-1194|https://issues.apache.org/jira/browse/KAFKA-1194] and specific to 
> Windows. I thought it would be better to address this as a KIP item since 
> it's OS platform specific.
> Quite a lot of unit tests run on Windows platform fails after gradle build 
> because of the same. Could I please request you to put some thought whether 
> considering a platform-independent way of handling file renaming and 
> deletion? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079678#comment-16079678
 ] 

M. Manna edited comment on KAFKA-1194 at 7/9/17 6:39 PM:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 


was (Author: manme...@gmail.com):
I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
Fi

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079678#comment-16079678
 ] 

M. Manna edited comment on KAFKA-1194 at 7/9/17 6:24 PM:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 


was (Author: manme...@gmail.com):
I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.repla

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079678#comment-16079678
 ] 

M. Manna commented on KAFKA-1194:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png, Untitled.jpg
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimize