[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2019-04-09 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813603#comment-16813603
 ] 

Paul Davidson commented on KAFKA-5061:
--

Voting is now underway on 
[KIP-411|https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct].
  Please vote if you would like this bug resolved! 

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6876) Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss

2019-04-04 Thread Paul Davidson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Davidson updated KAFKA-6876:
-
Affects Version/s: 2.0.1

> Sender exceptions ignored by WorkerSourceTask producer Callback causing data 
> loss
> -
>
> Key: KAFKA-6876
> URL: https://issues.apache.org/jira/browse/KAFKA-6876
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.1, 1.1.0, 2.0.1
> Environment: Linux, JDK 8
>Reporter: Paul Davidson
>Priority: Major
>
> The producer callback in "WorkerSourceTask" handles exceptions during a 
> send() by logging at ERROR level and continuing.  This can lead to offsets 
> being committed for records that were never sent correctly.  The records are 
> effectively skipped, leading to data loss in our use case.   
> The source code for the Callback "onCompletion()" method suggests this should 
> "basically never happen ... callbacks with exceptions should never be invoked 
> in practice", but we have seen this happen several times in production, 
> especially in near heap-exhaustion situations when the Sender thread 
> generates an exception (often caused by KAFKA-6551).
> From WorkerSourceTask line 253:
> {code:java}
> new Callback() {
>    @Override
>    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
>    if (e != null) {
>    // Given the default settings for zero data loss, this should 
> basically never happen --
>    // between "infinite" retries, indefinite blocking on full 
> buffers, and "infinite" request
>    // timeouts, callbacks with exceptions should never be invoked in 
> practice. If the
>    // user overrode these settings, the best we can do is notify them 
> of the failure via
>    // logging.
>    log.error("{} failed to send record to {}: {}", this, topic, e);
>    log.debug("{} Failed record: {}", this, preTransformRecord);
>    } else {
>   log.trace("{} Wrote record successfully: topic {} partition {} 
> offset {}",
>    this,
>    recordMetadata.topic(), recordMetadata.partition(),
>    recordMetadata.offset());
>    commitTaskRecord(preTransformRecord);
>    }
>    recordSent(producerRecord);
>    counter.completeRecord();
>    }
> }
> {code}
>  
> Example of an exception triggering the bug:
> {code:java}
> 2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR 
> o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to 
> topic-name: {}
> java.lang.IllegalStateException: Producer is closed forcefully.
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7749) confluent does not provide option to set consumer properties at connector level

2019-01-10 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739888#comment-16739888
 ] 

Paul Davidson commented on KAFKA-7749:
--

Hi [~sliebau]. You can probably ignore the comment about task level settings 
now. It was actually motivated by the problem described here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique
 and here https://issues.apache.org/jira/browse/KAFKA-5061 . We seem to be 
close to resolving that particular issue by simply including the task id in the 
default client ID. I can't think of any other specific cases where task-level 
settings would be particularly useful.

> confluent does not provide option to set consumer properties at connector 
> level
> ---
>
> Key: KAFKA-7749
> URL: https://issues.apache.org/jira/browse/KAFKA-7749
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Manjeet Duhan
>Priority: Major
>
> _We want to increase consumer.max.poll.record to increase performance but 
> this  value can only be set in worker properties which is applicable to all 
> connectors given cluster._
>  __ 
> _Operative Situation :- We have one project which is communicating with 
> Elasticsearch and we set consumer.max.poll.record=500 after multiple 
> performance tests which worked fine for an year._
>  _Then one more project onboarded in the same cluster which required 
> consumer.max.poll.record=5000 based on their performance tests. This 
> configuration is moved to production._
>   _Admetric started failing as it was taking more than 5 minutes to process 
> 5000 polled records and started throwing commitfailed exception which is 
> vicious cycle as it will process same data over and over again._
>  __ 
> _We can control above if start consumer using plain java but this control was 
> not available at each consumer level in confluent connector._
> _I have overridden kafka code to accept connector properties which will be 
> applied to single connector and others will keep on using default properties 
> . These changes are already running in production for more than 5 months._
> _Some of the properties which were useful for us._
> max.poll.records
> max.poll.interval.ms
> request.timeout.ms
> key.deserializer
> value.deserializer
> heartbeat.interval.ms
> session.timeout.ms
> auto.offset.reset
> connections.max.idle.ms
> enable.auto.commit
>  
> auto.commit.interval.ms
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2019-01-07 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736139#comment-16736139
 ] 

Paul Davidson commented on KAFKA-5061:
--

I have created a alternative PR after comments from [~ewencp] and 
[~kkonstantine] on KIP-411. This PR simply changes the default client id and 
avoids creating any new configuration options.  See 
https://github.com/apache/kafka/pull/6097

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-12-23 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16728058#comment-16728058
 ] 

Paul Davidson edited comment on KAFKA-5061 at 12/23/18 9:24 PM:


I have created [KIP-411: Add option to make Kafka Connect task client ID values 
unique|https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique]
 

FYI [~mageshn]

 

 
 


was (Author: pdavidson):
I have created [KIP-411: Add option to make Kafka Connect task client ID values 
unique|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique]|http://example.com][https//cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique]]
 

FYI [~mageshn]

 

 
 

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-12-23 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16728058#comment-16728058
 ] 

Paul Davidson edited comment on KAFKA-5061 at 12/23/18 9:24 PM:


I have created [KIP-411: Add option to make Kafka Connect task client ID values 
unique|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique]|http://example.com][https//cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique]]
 

FYI [~mageshn]

 

 
 


was (Author: pdavidson):
I have created [KIP-411: Add option to make Kafka Connect task client ID values 
unique

|https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique]FYI
 [~mageshn]  

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-12-23 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16728058#comment-16728058
 ] 

Paul Davidson commented on KAFKA-5061:
--

I have created [KIP-411: Add option to make Kafka Connect task client ID values 
unique

|https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique]FYI
 [~mageshn]  

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7749) confluent does not provide option to set consumer properties at connector level

2018-12-20 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726149#comment-16726149
 ] 

Paul Davidson commented on KAFKA-7749:
--

I would also like to see this in the context of Source Connectors, where it 
would be useful for the connector to override producer properties - ideally at 
the task level.  For example, in Mirus 
([https://github.com/salesforce/mirus|https://github.com/salesforce/mirus).]) 
this would allow each Source Connector to be directed at a different 
destination cluster without setting up a separate set of workers for each 
destination.  It would also allow the connector to tune the producer properties 
for each destination cluster (e.g. by tuning the linger time and batch size 
depending on whether the destination cluster is local or remote).

> confluent does not provide option to set consumer properties at connector 
> level
> ---
>
> Key: KAFKA-7749
> URL: https://issues.apache.org/jira/browse/KAFKA-7749
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Manjeet Duhan
>Priority: Major
>
> _We want to increase consumer.max.poll.record to increase performance but 
> this  value can only be set in worker properties which is applicable to all 
> connectors given cluster._
>  __ 
> _Operative Situation :- We have one project which is communicating with 
> Elasticsearch and we set consumer.max.poll.record=500 after multiple 
> performance tests which worked fine for an year._
>  _Then one more project onboarded in the same cluster which required 
> consumer.max.poll.record=5000 based on their performance tests. This 
> configuration is moved to production._
>   _Admetric started failing as it was taking more than 5 minutes to process 
> 5000 polled records and started throwing commitfailed exception which is 
> vicious cycle as it will process same data over and over again._
>  __ 
> _We can control above if start consumer using plain java but this control was 
> not available at each consumer level in confluent connector._
> _I have overridden kafka code to accept connector properties which will be 
> applied to single connector and others will keep on using default properties 
> . These changes are already running in production for more than 5 months._
> _Some of the properties which were useful for us._
> max.poll.records
> max.poll.interval.ms
> request.timeout.ms
> key.deserializer
> value.deserializer
> heartbeat.interval.ms
> session.timeout.ms
> auto.offset.reset
> connections.max.idle.ms
> enable.auto.commit
>  
> auto.commit.interval.ms
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-12-11 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718311#comment-16718311
 ] 

Paul Davidson commented on KAFKA-5061:
--

Thanks for the response [~mageshn] - I will create a KIP.

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-12-06 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711912#comment-16711912
 ] 

Paul Davidson commented on KAFKA-5061:
--

I updated PR [https://github.com/apache/kafka/pull/5775] to resolve some 
conflicts that appeared after recent changes in trunk.  How should I go about 
getting this change reviewed (and hopefully merged)? [~ewencp] - is this 
something you can help with?

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2018-10-10 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645507#comment-16645507
 ] 

Paul Davidson commented on KAFKA-5061:
--

This bug became an urgent issue at Salesforce as we needed full task-level 
monitoring in production for Mirus (our open source Kafka Source Connector).  
As a work-around we created a patch, to meet our specific needs, which has been 
running successfully in production since March. I would really like to see this 
bug resolved, so I have submitted a Pull Request 
([https://github.com/apache/kafka/pull/5775]) with our battle-tested patch.

The change is similar to the one Satyajit submitted, but instead of using a 
default based on group id, it appends the task id to the client id when a 
“unique.client.id” property is set to true (false by default). This allows us 
to control the client id independent of the group id, so we can use a custom 
prefix on our producer client ids.  This is useful for monitoring. 

[~ewencp] this may not address all your concerns, but it certainly solved an 
urgent issue for us in a real-world use case.  Let me know where the gaps are 
and I will try to help to find a general solution. If the new Worker property 
in the PR requires a KIP then let me know and I can help create one.  There are 
no unit tests at present, but I am happy to add those if the change looks 
otherwise OK.

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6876) Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss

2018-05-07 Thread Paul Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Davidson updated KAFKA-6876:
-
Description: 
The producer callback in "WorkerSourceTask" handles exceptions during a send() 
by logging at ERROR level and continuing.  This can lead to offsets being 
committed for records that were never sent correctly.  The records are 
effectively skipped, leading to data loss in our use case.   

The source code for the Callback "onCompletion()" method suggests this should 
"basically never happen ... callbacks with exceptions should never be invoked 
in practice", but we have seen this happen several times in production, 
especially in near heap-exhaustion situations when the Sender thread generates 
an exception (often caused by KAFKA-6551).

>From WorkerSourceTask line 253:
{code:java}
new Callback() {
   @Override
   public void onCompletion(RecordMetadata recordMetadata, Exception e) {
   if (e != null) {
   // Given the default settings for zero data loss, this should 
basically never happen --
   // between "infinite" retries, indefinite blocking on full buffers, 
and "infinite" request
   // timeouts, callbacks with exceptions should never be invoked in 
practice. If the
   // user overrode these settings, the best we can do is notify them 
of the failure via
   // logging.
   log.error("{} failed to send record to {}: {}", this, topic, e);
   log.debug("{} Failed record: {}", this, preTransformRecord);
   } else {
  log.trace("{} Wrote record successfully: topic {} partition {} offset 
{}",
   this,
   recordMetadata.topic(), recordMetadata.partition(),
   recordMetadata.offset());
   commitTaskRecord(preTransformRecord);
   }
   recordSent(producerRecord);
   counter.completeRecord();
   }
}
{code}
 

Example of an exception triggering the bug:
{code:java}
2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR 
o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to 
topic-name: {}
java.lang.IllegalStateException: Producer is closed forcefully.
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
at java.lang.Thread.run(Thread.java:748)
{code}

  was:
The producer callback in "WorkerSourceTask" handles exceptions during a send() 
by logging at ERROR level and continuing.  This can lead to offsets being 
committed for records that were never sent correctly.  The records are 
effectively skipped, leading to data loss in our use case.   

The source code for the Callback "onCompletion()" method suggests this should 
"basically never happen ... callbacks with exceptions should never be invoked 
in practice", but we have seen this happen several times in production, 
especially in near heap-exhaustion situations when the Sender thread generates 
an exception (often caused by KAFKA-6551).

>From WorkerSourceTask:
{code:java}
new Callback() {
   @Override
   public void onCompletion(RecordMetadata recordMetadata, Exception e) {
   if (e != null) {
   // Given the default settings for zero data loss, this should 
basically never happen --
   // between "infinite" retries, indefinite blocking on full buffers, 
and "infinite" request
   // timeouts, callbacks with exceptions should never be invoked in 
practice. If the
   // user overrode these settings, the best we can do is notify them 
of the failure via
   // logging.
   log.error("{} failed to send record to {}: {}", this, topic, e);
   log.debug("{} Failed record: {}", this, preTransformRecord);
   } else {
  log.trace("{} Wrote record successfully: topic {} partition {} offset 
{}",
   this,
   recordMetadata.topic(), recordMetadata.partition(),
   recordMetadata.offset());
   commitTaskRecord(preTransformRecord);
   }
   recordSent(producerRecord);
   counter.completeRecord();
   }
}
{code}
 

Example of an exception triggering the bug:
{code:java}
2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR 
o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to 
topic-name: {}
java.lang.IllegalStateException: Producer is closed forcefully.
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
at 

[jira] [Commented] (KAFKA-6876) Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss

2018-05-07 Thread Paul Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466434#comment-16466434
 ] 

Paul Davidson commented on KAFKA-6876:
--

Note I do have a patch ready that simply fails the affected Task to avoid 
spurious offsets being committed. I'm happy to share this, but I would like to 
hear other opinions on how this case should be handled.

> Sender exceptions ignored by WorkerSourceTask producer Callback causing data 
> loss
> -
>
> Key: KAFKA-6876
> URL: https://issues.apache.org/jira/browse/KAFKA-6876
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.1, 1.1.0
> Environment: Linux, JDK 8
>Reporter: Paul Davidson
>Priority: Major
>
> The producer callback in "WorkerSourceTask" handles exceptions during a 
> send() by logging at ERROR level and continuing.  This can lead to offsets 
> being committed for records that were never sent correctly.  The records are 
> effectively skipped, leading to data loss in our use case.   
> The source code for the Callback "onCompletion()" method suggests this should 
> "basically never happen ... callbacks with exceptions should never be invoked 
> in practice", but we have seen this happen several times in production, 
> especially in near heap-exhaustion situations when the Sender thread 
> generates an exception (often caused by KAFKA-6551).
> From WorkerSourceTask:
> {code:java}
> new Callback() {
>    @Override
>    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
>    if (e != null) {
>    // Given the default settings for zero data loss, this should 
> basically never happen --
>    // between "infinite" retries, indefinite blocking on full 
> buffers, and "infinite" request
>    // timeouts, callbacks with exceptions should never be invoked in 
> practice. If the
>    // user overrode these settings, the best we can do is notify them 
> of the failure via
>    // logging.
>    log.error("{} failed to send record to {}: {}", this, topic, e);
>    log.debug("{} Failed record: {}", this, preTransformRecord);
>    } else {
>   log.trace("{} Wrote record successfully: topic {} partition {} 
> offset {}",
>    this,
>    recordMetadata.topic(), recordMetadata.partition(),
>    recordMetadata.offset());
>    commitTaskRecord(preTransformRecord);
>    }
>    recordSent(producerRecord);
>    counter.completeRecord();
>    }
> }
> {code}
>  
> Example of an exception triggering the bug:
> {code:java}
> 2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR 
> o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to 
> topic-name: {}
> java.lang.IllegalStateException: Producer is closed forcefully.
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6876) Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss

2018-05-07 Thread Paul Davidson (JIRA)
Paul Davidson created KAFKA-6876:


 Summary: Sender exceptions ignored by WorkerSourceTask producer 
Callback causing data loss
 Key: KAFKA-6876
 URL: https://issues.apache.org/jira/browse/KAFKA-6876
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0, 0.11.0.1
 Environment: Linux, JDK 8
Reporter: Paul Davidson


The producer callback in "WorkerSourceTask" handles exceptions during a send() 
by logging at ERROR level and continuing.  This can lead to offsets being 
committed for records that were never sent correctly.  The records are 
effectively skipped, leading to data loss in our use case.   

The source code for the Callback "onCompletion()" method suggests this should 
"basically never happen ... callbacks with exceptions should never be invoked 
in practice", but we have seen this happen several times in production, 
especially in near heap-exhaustion situations when the Sender thread generates 
an exception (often caused by KAFKA-6551).

>From WorkerSourceTask:
{code:java}
new Callback() {
   @Override
   public void onCompletion(RecordMetadata recordMetadata, Exception e) {
   if (e != null) {
   // Given the default settings for zero data loss, this should 
basically never happen --
   // between "infinite" retries, indefinite blocking on full buffers, 
and "infinite" request
   // timeouts, callbacks with exceptions should never be invoked in 
practice. If the
   // user overrode these settings, the best we can do is notify them 
of the failure via
   // logging.
   log.error("{} failed to send record to {}: {}", this, topic, e);
   log.debug("{} Failed record: {}", this, preTransformRecord);
   } else {
  log.trace("{} Wrote record successfully: topic {} partition {} offset 
{}",
   this,
   recordMetadata.topic(), recordMetadata.partition(),
   recordMetadata.offset());
   commitTaskRecord(preTransformRecord);
   }
   recordSent(producerRecord);
   counter.completeRecord();
   }
}
{code}
 

Example of an exception triggering the bug:
{code:java}
2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR 
o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to 
topic-name: {}
java.lang.IllegalStateException: Producer is closed forcefully.
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-08 Thread Paul Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357331#comment-16357331
 ] 

Paul Davidson edited comment on KAFKA-6388 at 2/9/18 12:03 AM:
---

[~hachikuji] We saw this on non-compacted topics.


was (Author: pdavidson):
[~dhay]  We saw this on non-compacted topics.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> 

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-08 Thread Paul Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357331#comment-16357331
 ] 

Paul Davidson commented on KAFKA-6388:
--

[~dhay]  We saw this on non-compacted topics.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
> ... 13 more
> [2017-12-19 15:16:24,302] INFO 

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-01-25 Thread Paul Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339667#comment-16339667
 ] 

Paul Davidson commented on KAFKA-6388:
--

We saw the same issue after an upgrade from 0.9 to 0.11.  Jason Gustafson's 
suggestion of shutting down the broker, deleting the index files, and 
restarting worked.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
>