[jira] [Assigned] (KAFKA-4897) LogCleaner#cleanSegments should not ignore failures to delete files

2018-01-05 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reassigned KAFKA-4897:


Assignee: Manikumar

> LogCleaner#cleanSegments should not ignore failures to delete files
> ---
>
> Key: KAFKA-4897
> URL: https://issues.apache.org/jira/browse/KAFKA-4897
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Colin P. McCabe
>Assignee: Manikumar
>
> LogCleaner#cleanSegments should not ignore failures to delete files.  
> Currently, it ignores the failure and does not even log an error message.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6425) Calculating cleanBytes in LogToClean might not be correct

2018-01-05 Thread huxihx (JIRA)
huxihx created KAFKA-6425:
-

 Summary: Calculating cleanBytes in LogToClean might not be correct
 Key: KAFKA-6425
 URL: https://issues.apache.org/jira/browse/KAFKA-6425
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.0.0
Reporter: huxihx


In class `LogToClean`, the calculation for `cleanBytes` is as below:
{code:java}
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
{code}

Most of the time, the `firstDirtyOffset` is the base offset of active segment 
which works pretty well with log.logSegments, so we can calculate the 
cleanBytes by safely summing up the sizes of all log segments whose base offset 
is less than `firstDirtyOffset`.

However, things changed after `firstUnstableOffset` was introduced. Users could 
indirectly change this offset to a non-base offset(changing log start offset 
for instance). In this case, it's not correct to sum up the total size for a 
log segment. Instead, we should only sum up the bytes between the base offset 
and `firstUnstableOffset`.

Let me show an example:
Say I have three log segments, shown as below:
0L   -->  log segment1, size: 1000Bytes
1234L -->  log segment2, size: 1000Bytes
4567L --> active log segment, current size: 500Bytes

Based on the current code, if `firstUnstableOffset` is deliberately set to 
2000L(this could be possible, since it's lower bounded by the log start offset 
and user could explicitly change LSO), then `cleanBytes` is calculated as 
2000Bytes which is wrong. The expected value should be 1000 + (bytes between 
offset 1234L and 2000L) 

[~junrao] [~ijuma] Do all of these make sense?






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4991) KerberosLogin#login should probably be synchronized

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312781#comment-16312781
 ] 

ASF GitHub Bot commented on KAFKA-4991:
---

omkreddy opened a new pull request #4394: KAFKA-4991: Resolve findbugs warnings 
in KerberosLogin
URL: https://github.com/apache/kafka/pull/4394
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KerberosLogin#login should probably be synchronized
> ---
>
> Key: KAFKA-4991
> URL: https://issues.apache.org/jira/browse/KAFKA-4991
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>  Labels: newbie
> Fix For: 1.1.0
>
>
> KerberosLogin#login should probably be synchronized, since it is modifying 
> {{loginContext}} and {{lastLogin}}, which are normally only accessed under 
> the lock.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6426) Kafka SASL/SCRAM authentication does not fail for incorrect username or password.

2018-01-05 Thread Menaka Madushanka (JIRA)
Menaka Madushanka created KAFKA-6426:


 Summary: Kafka SASL/SCRAM authentication does not fail for 
incorrect username or password.
 Key: KAFKA-6426
 URL: https://issues.apache.org/jira/browse/KAFKA-6426
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.1
 Environment: Ubuntu 16.04, JDK 1.8, Kafka_2.10-0.10.2.1
Reporter: Menaka Madushanka
 Attachments: broker-jaas.conf, client-jaas.conf, consumer.properties, 
producer.properties, server.properties

Hi,

I configured Kafka 0.10.2.1 for SASL/SCRAM by following the documentation [1]. 
But it does work when I use incorrect username or password in the client as 
well. 

I have attached the server.properties, consumer.properties, 
producer.properties, jass config files for broker and client. 

Also, in my producer, I have set
 {{props.put("sasl.mechanism", "SCRAM-SHA-256");}}

but when running, it shows,
{{kafka.utils.VerifiableProperties  - Property sasl.mechanism is not valid}}

[1] 
[https://kafka.apache.org/documentation/#security_sasl_scram|https://kafka.apache.org/documentation/#security_sasl_scram]

Thanks and Regards,
Menaka



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6426) Kafka SASL/SCRAM authentication does not fail for incorrect username or password.

2018-01-05 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312807#comment-16312807
 ] 

Manikumar commented on KAFKA-6426:
--

In producer.properties file,  security.protocol is set to PLAINTEXT. This 
should be SASL_PLAINTEXT and consumer.properties is pointing to old consumer 
props.   Use new consumer props:
https://github.com/apache/kafka/blob/trunk/config/consumer.properties

> Kafka SASL/SCRAM authentication does not fail for incorrect username or 
> password.
> -
>
> Key: KAFKA-6426
> URL: https://issues.apache.org/jira/browse/KAFKA-6426
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Ubuntu 16.04, JDK 1.8, Kafka_2.10-0.10.2.1
>Reporter: Menaka Madushanka
> Attachments: broker-jaas.conf, client-jaas.conf, consumer.properties, 
> producer.properties, server.properties
>
>
> Hi,
> I configured Kafka 0.10.2.1 for SASL/SCRAM by following the documentation 
> [1]. 
> But it does work when I use incorrect username or password in the client as 
> well. 
> I have attached the server.properties, consumer.properties, 
> producer.properties, jass config files for broker and client. 
> Also, in my producer, I have set
>  {{props.put("sasl.mechanism", "SCRAM-SHA-256");}}
> but when running, it shows,
> {{kafka.utils.VerifiableProperties  - Property sasl.mechanism is not valid}}
> [1] 
> [https://kafka.apache.org/documentation/#security_sasl_scram|https://kafka.apache.org/documentation/#security_sasl_scram]
> Thanks and Regards,
> Menaka



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6426) Kafka SASL/SCRAM authentication does not fail for incorrect username or password.

2018-01-05 Thread Menaka Madushanka (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312885#comment-16312885
 ] 

Menaka Madushanka commented on KAFKA-6426:
--

Hi Manikumar,

Thank you very much for the information. 
I applied the above configurations but still, I can consume with faulty 
username and password. 

And do you have any fix for,
kafka.utils.VerifiableProperties - Property sasl.mechanism is not valid ?

Thanks and Regards,
Menaka

> Kafka SASL/SCRAM authentication does not fail for incorrect username or 
> password.
> -
>
> Key: KAFKA-6426
> URL: https://issues.apache.org/jira/browse/KAFKA-6426
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Ubuntu 16.04, JDK 1.8, Kafka_2.10-0.10.2.1
>Reporter: Menaka Madushanka
> Attachments: broker-jaas.conf, client-jaas.conf, consumer.properties, 
> producer.properties, server.properties
>
>
> Hi,
> I configured Kafka 0.10.2.1 for SASL/SCRAM by following the documentation 
> [1]. 
> But it does work when I use incorrect username or password in the client as 
> well. 
> I have attached the server.properties, consumer.properties, 
> producer.properties, jass config files for broker and client. 
> Also, in my producer, I have set
>  {{props.put("sasl.mechanism", "SCRAM-SHA-256");}}
> but when running, it shows,
> {{kafka.utils.VerifiableProperties  - Property sasl.mechanism is not valid}}
> [1] 
> [https://kafka.apache.org/documentation/#security_sasl_scram|https://kafka.apache.org/documentation/#security_sasl_scram]
> Thanks and Regards,
> Menaka



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6426) Kafka SASL/SCRAM authentication does not fail for incorrect username or password.

2018-01-05 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312902#comment-16312902
 ] 

Manikumar commented on KAFKA-6426:
--

looks like still you are having config issues. you need to add 
listeners=SASL_PLAINTEXT://host.name:port
to your server props. Pls go through the Kafka documentation again. configure 
SASL_PLAINTEXT listener in server.properties, and security.protocol 
=SASL_PLAINTEXT in producer/consumer props.

> Kafka SASL/SCRAM authentication does not fail for incorrect username or 
> password.
> -
>
> Key: KAFKA-6426
> URL: https://issues.apache.org/jira/browse/KAFKA-6426
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Ubuntu 16.04, JDK 1.8, Kafka_2.10-0.10.2.1
>Reporter: Menaka Madushanka
> Attachments: broker-jaas.conf, client-jaas.conf, consumer.properties, 
> producer.properties, server.properties
>
>
> Hi,
> I configured Kafka 0.10.2.1 for SASL/SCRAM by following the documentation 
> [1]. 
> But it does work when I use incorrect username or password in the client as 
> well. 
> I have attached the server.properties, consumer.properties, 
> producer.properties, jass config files for broker and client. 
> Also, in my producer, I have set
>  {{props.put("sasl.mechanism", "SCRAM-SHA-256");}}
> but when running, it shows,
> {{kafka.utils.VerifiableProperties  - Property sasl.mechanism is not valid}}
> [1] 
> [https://kafka.apache.org/documentation/#security_sasl_scram|https://kafka.apache.org/documentation/#security_sasl_scram]
> Thanks and Regards,
> Menaka



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6338) java.lang.NoClassDefFoundError: org/apache/kafka/common/network/LoginType

2018-01-05 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313010#comment-16313010
 ] 

Manikumar commented on KAFKA-6338:
--

Some of the code/api got changed in Kafka 0.11. This needs to be fixed/updated 
in Ranger. maybe you can raise an issue on Ranger JIRA board.

> java.lang.NoClassDefFoundError: org/apache/kafka/common/network/LoginType
> -
>
> Key: KAFKA-6338
> URL: https://issues.apache.org/jira/browse/KAFKA-6338
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 1.0.0
>Reporter: Ronald van de Kuil
>Priority: Minor
>
> I have just setup a kerberized Kafa cluster with Ranger 0.7.1 and Kafka 
> 1.0.0. 
> It all seems to work fine as I see that authorisation policies are enforced 
> and auditlogging is present.
> On startup of a kafka server I see a stack trace but it does not seem to 
> matter.
> My wish is to keep the logs tidy and free of false alerts.
> I wonder whether I have an issue somewhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6077) Let SimpleConsumer support Kerberos authentication

2018-01-05 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6077.
--
Resolution: Won't Fix

The old consumer is no longer supported, please consider above mentioned 
options. 

> Let SimpleConsumer support Kerberos authentication
> --
>
> Key: KAFKA-6077
> URL: https://issues.apache.org/jira/browse/KAFKA-6077
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: huangjianan
>
> Cannot use SimpleConsumer in Kafka Kerberos environment



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6426) Kafka SASL/SCRAM authentication does not fail for incorrect username or password.

2018-01-05 Thread Menaka Madushanka (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313021#comment-16313021
 ] 

Menaka Madushanka commented on KAFKA-6426:
--

HI Manikumar,

I configured the server.properties file with the SASL_PlAINTEXT for listeners, 
and advertised.listeners and in producer and consumer properties. But when I 
run the consumer with these configs, I get this error.


{code:java}
[2018-01-05 17:48:41,346]  WARN 
{kafka.consumer.ConsumerFetcherManager$LeaderFinderThread} -  
[group_menaka-ThinkPad-X1-Carbon-3rd-1515154589076-a2c82e5a-leader-finder-thread],
 Failed to find leader for Set(sensordata-0)
kafka.common.BrokerEndPointNotAvailableException: End point with security 
protocol PLAINTEXT not found for broker 0
at 
kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1$$anonfun$apply$5.apply(ClientUtils.scala:146)
at 
kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1$$anonfun$apply$5.apply(ClientUtils.scala:146)
at scala.Option.getOrElse(Option.scala:120)
at 
kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1.apply(ClientUtils.scala:146)
at 
kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1.apply(ClientUtils.scala:142)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
kafka.client.ClientUtils$.getPlaintextBrokerEndPoints(ClientUtils.scala:142)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
{code}



> Kafka SASL/SCRAM authentication does not fail for incorrect username or 
> password.
> -
>
> Key: KAFKA-6426
> URL: https://issues.apache.org/jira/browse/KAFKA-6426
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Ubuntu 16.04, JDK 1.8, Kafka_2.10-0.10.2.1
>Reporter: Menaka Madushanka
> Attachments: broker-jaas.conf, client-jaas.conf, consumer.properties, 
> producer.properties, server.properties
>
>
> Hi,
> I configured Kafka 0.10.2.1 for SASL/SCRAM by following the documentation 
> [1]. 
> But it does work when I use incorrect username or password in the client as 
> well. 
> I have attached the server.properties, consumer.properties, 
> producer.properties, jass config files for broker and client. 
> Also, in my producer, I have set
>  {{props.put("sasl.mechanism", "SCRAM-SHA-256");}}
> but when running, it shows,
> {{kafka.utils.VerifiableProperties  - Property sasl.mechanism is not valid}}
> [1] 
> [https://kafka.apache.org/documentation/#security_sasl_scram|https://kafka.apache.org/documentation/#security_sasl_scram]
> Thanks and Regards,
> Menaka



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313269#comment-16313269
 ] 

ASF GitHub Bot commented on KAFKA-6086:
---

farmdawgnation opened a new pull request #4395: MINOR: Add documentation for 
KAFKA-6086 (ProductionExceptionHandler)
URL: https://github.com/apache/kafka/pull/4395
 
 
   Attempt number 2 at adding documentation related to KAFKA-6086, the 
`ProductionExceptionHandler`.
   
   Tagging @guozhangwang and @mjsax for review!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide for custom error handling when Kafka Streams fails to produce
> -
>
> Key: KAFKA-6086
> URL: https://issues.apache.org/jira/browse/KAFKA-6086
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Matt Farmer
>  Labels: kip
> Fix For: 1.1.0
>
>
> This is an issue related to the following KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6311) Expose Kafka cluster ID in Connect REST API

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313341#comment-16313341
 ] 

ASF GitHub Bot commented on KAFKA-6311:
---

hachikuji closed pull request #4314: KAFKA-6311: Expose Kafka cluster ID in 
Connect REST API (KIP-238)
URL: https://github.com/apache/kafka/pull/4314
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 11fe428b731..49ad8d02872 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,13 +38,41 @@
 import java.util.concurrent.TimeUnit;
 
 public class MockAdminClient extends AdminClient {
+public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+
 private final List brokers;
 private final Map allTopics = new HashMap<>();
+private final String clusterId;
 
+private Node controller;
 private int timeoutNextRequests = 0;
 
-public MockAdminClient(List brokers) {
+/**
+ * Creates MockAdminClient for a cluster with the given brokers. The Kafka 
cluster ID uses the default value from
+ * DEFAULT_CLUSTER_ID.
+ *
+ * @param brokers list of brokers in the cluster
+ * @param controller node that should start as the controller
+ */
+public MockAdminClient(List brokers, Node controller) {
+this(brokers, controller, DEFAULT_CLUSTER_ID);
+}
+
+/**
+ * Creates MockAdminClient for a cluster with the given brokers.
+ * @param brokers list of brokers in the cluster
+ * @param controller node that should start as the controller
+ */
+public MockAdminClient(List brokers, Node controller, String 
clusterId) {
 this.brokers = brokers;
+controller(controller);
+this.clusterId = clusterId;
+}
+
+public void controller(Node controller) {
+if (!brokers.contains(controller))
+throw new IllegalArgumentException("The controller node must be in 
the list of brokers");
+this.controller = controller;
 }
 
 public void addTopic(boolean internal,
@@ -82,7 +110,22 @@ public void timeoutNextRequest(int numberOfRequest) {
 
 @Override
 public DescribeClusterResult describeCluster(DescribeClusterOptions 
options) {
-throw new UnsupportedOperationException("Not implemented yet");
+KafkaFutureImpl> nodesFuture = new 
KafkaFutureImpl<>();
+KafkaFutureImpl controllerFuture = new KafkaFutureImpl<>();
+KafkaFutureImpl brokerIdFuture = new KafkaFutureImpl<>();
+
+if (timeoutNextRequests > 0) {
+nodesFuture.completeExceptionally(new TimeoutException());
+controllerFuture.completeExceptionally(new TimeoutException());
+brokerIdFuture.completeExceptionally(new TimeoutException());
+--timeoutNextRequests;
+} else {
+nodesFuture.complete(brokers);
+controllerFuture.complete(controller);
+brokerIdFuture.complete(clusterId);
+}
+
+return new DescribeClusterResult(nodesFuture, controllerFuture, 
brokerIdFuture);
 }
 
 @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 1b2f94e4613..98a77ed06c9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,8 @@ public static void main(String[] args) throws Exception {
 plugins.compareAndSwapWithDelegatingLoader();
 DistributedConfig config = new DistributedConfig(workerProps);
 
+String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
 RestServer rest = new RestServer(config);
 URI advertisedUrl = rest.advertisedUrl();
 String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
@@ -85,7 +88,8 @@ public static void main(String[] args) throws Exception {
 
 ConfigBackingStore configBackingStore = new 
KafkaConfigBackingStore(worker.getI

[jira] [Created] (KAFKA-6427) Inconsistent exception type from KafkaConsumer.position

2018-01-05 Thread Jay Kahrman (JIRA)
Jay Kahrman created KAFKA-6427:
--

 Summary: Inconsistent exception type from KafkaConsumer.position
 Key: KAFKA-6427
 URL: https://issues.apache.org/jira/browse/KAFKA-6427
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Jay Kahrman
Priority: Trivial


If KafkaConsumer.position is called with a partition that the consumer isn't 
assigned, it throws an IllegalArgumentException. All other APIs throw an 
IllegalStateException when the consumer tries to act on a partition that is not 
assigned to the consumer. 

Looking at the implementation, if it weren't for subscription test and 
IllegalArgumentException thrown at the beginning of KafkaConsumer.position, the 
very next line would throw an IllegalStateException anyway.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6419) Menu order in Streams docs incorrect

2018-01-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313823#comment-16313823
 ] 

Matthias J. Sax commented on KAFKA-6419:


[~tankhiwale] Seems that Joel did already a PR for it -- in general, if tickets 
are unassigned, you can always assign them to yourself if you want to pick it 
up.

> Menu order in Streams docs incorrect
> 
>
> Key: KAFKA-6419
> URL: https://issues.apache.org/jira/browse/KAFKA-6419
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Matthias J. Sax
>Assignee: Joel Hamill
>Priority: Minor
>
> The menu order in Steams page is
> {noformat}
> INTRODUCTIONDEVELOPERGUIDE CONCEPTSRUN DEMO APPTUTORIAL: 
> WRITE APP
> {noformat}
> However, on page "Introduction" when clicking "next" you get to "Run Demo 
> App" and from there (on "next") you get to "Tutorial: write App".
> The menu and the order of "next" should align with each other.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314079#comment-16314079
 ] 

ASF GitHub Bot commented on KAFKA-6252:
---

wicknicks opened a new pull request #4397: KAFKA-6252: Close the metric group 
to clean up any existing metrics
URL: https://github.com/apache/kafka/pull/4397
 
 
   Signed-off-by: Arjun Satish 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6314) Add a tool to delete kafka based consumer offsets for a given group

2018-01-05 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314204#comment-16314204
 ] 

Ivan Babrou commented on KAFKA-6314:


Is there a workaround that allows universal alerting for lagging consumers?

> Add a tool to delete kafka based consumer offsets for a given group
> ---
>
> Key: KAFKA-6314
> URL: https://issues.apache.org/jira/browse/KAFKA-6314
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer, core, tools
>Reporter: Tom Scott
>Priority: Minor
>
> Add a tool to delete kafka based consumer offsets for a given group similar 
> to the reset tool. It could look something like this:
> kafka-consumer-groups --bootstrap-server localhost:9092 --delete-offsets 
> --group somegroup
> The case for this is as follows:
> 1. Consumer group with id: group1 subscribes to topic1
> 2. The group is stopped 
> 3. The subscription changed to topic2 but the id is kept as group1
> Now the out output of kafka-consumer-groups --describe for the group will 
> show topic1 even though the group is not subscribed to that topic. This is 
> bad for monitoring as it will show lag on topic1.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4834) Kafka cannot delete topic with ReplicaStateMachine went wrong

2018-01-05 Thread Kiran Pillarisetty (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314230#comment-16314230
 ] 

Kiran Pillarisetty commented on KAFKA-4834:
---

We are experiencing the same issue on 0.11.0.1.

> Kafka cannot delete topic with ReplicaStateMachine went wrong
> -
>
> Key: KAFKA-4834
> URL: https://issues.apache.org/jira/browse/KAFKA-4834
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.1.1
>Reporter: Dan
>  Labels: reliability
>
> It happened several times that some topics can not be deleted in our 
> production environment. By analyzing the log, we found ReplicaStateMachine 
> went wrong. Here are the error messages:
> In state-change.log:
> ERROR Controller 2 epoch 201 initiated state change of replica 1 for 
> partition [test_create_topic1,1] from OnlineReplica to ReplicaDeletionStarted 
> failed (state.change.logger)
> java.lang.AssertionError: assertion failed: Replica 
> [Topic=test_create_topic1,Partition=1,Replica=1] should be in the 
> OfflineReplica states before moving to ReplicaDeletionStarted state. Instead 
> it is in OnlineReplica state
> at scala.Predef$.assert(Predef.scala:179)
> at 
> kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> In controller.log:
> INFO Leader not yet assigned for partition [test_create_topic1,1]. Skip 
> sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch)
> There may exist two controllers in the cluster because creating a new topic 
> may trigger two machines to change the state of same partition, eg. 
> NonExistentPartition -> NewPartition.
> On the other controller, we found following messages in controller.log of 
> several days earlier:
> [2017-02-25 16:51:22,353] INFO [Topic Deletion Manager 

[jira] [Resolved] (KAFKA-4335) FileStreamSource Connector not working for large files (~ 1GB)

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-4335.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4356
[https://github.com/apache/kafka/pull/4356]

> FileStreamSource Connector not working for large files (~ 1GB)
> --
>
> Key: KAFKA-4335
> URL: https://issues.apache.org/jira/browse/KAFKA-4335
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Rahul Shukla
> Fix For: 1.1.0
>
>
> I was trying to sink large file about (1gb). FileStreamSource connector is 
> not working for that it's working fine for small files.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4335) FileStreamSource Connector not working for large files (~ 1GB)

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314316#comment-16314316
 ] 

ASF GitHub Bot commented on KAFKA-4335:
---

ewencp closed pull request #4356: KAFKA-4335: Add batch.size to 
FileStreamSource connector to prevent OOM
URL: https://github.com/apache/kafka/pull/4356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 335fe925519..59006dae4f4 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -36,13 +36,18 @@
 public class FileStreamSourceConnector extends SourceConnector {
 public static final String TOPIC_CONFIG = "topic";
 public static final String FILE_CONFIG = "file";
+public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
+
+public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
 private static final ConfigDef CONFIG_DEF = new ConfigDef()
 .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source 
filename. If not specified, the standard input will be used")
-.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to 
publish data to");
+.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to 
publish data to")
+.define(TASK_BATCH_SIZE_CONFIG, Type.INT, Importance.LOW, "The maximum 
number of records the Source task can read from file one time");
 
 private String filename;
 private String topic;
+private int batchSize = DEFAULT_TASK_BATCH_SIZE;
 
 @Override
 public String version() {
@@ -57,6 +62,14 @@ public void start(Map props) {
 throw new ConnectException("FileStreamSourceConnector 
configuration must include 'topic' setting");
 if (topic.contains(","))
 throw new ConnectException("FileStreamSourceConnector should only 
have a single topic when used as a source.");
+
+if (props.containsKey(TASK_BATCH_SIZE_CONFIG)) {
+try {
+batchSize = 
Integer.parseInt(props.get(TASK_BATCH_SIZE_CONFIG));
+} catch (NumberFormatException e) {
+throw new ConnectException("Invalid FileStreamSourceConnector 
configuration", e);
+}
+}
 }
 
 @Override
@@ -72,6 +85,7 @@ public void start(Map props) {
 if (filename != null)
 config.put(FILE_CONFIG, filename);
 config.put(TOPIC_CONFIG, topic);
+config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));
 configs.add(config);
 return configs;
 }
diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 8edf385611a..482102f7859 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -50,6 +50,7 @@
 private char[] buffer = new char[1024];
 private int offset = 0;
 private String topic = null;
+private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
 
 private Long streamOffset;
 
@@ -70,6 +71,14 @@ public void start(Map props) {
 topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
 if (topic == null)
 throw new ConnectException("FileStreamSourceTask config missing 
topic setting");
+
+if 
(props.containsKey(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)) {
+try {
+batchSize = 
Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
+} catch (NumberFormatException e) {
+throw new ConnectException("Invalid FileStreamSourceTask 
configuration", e);
+}
+}
 }
 
 @Override
@@ -146,6 +155,10 @@ public void start(Map props) {
 records = new ArrayList<>();
 records.add(new SourceRecord(offsetKey(filename), 
offsetValue(streamOffset), topic, null,
 null, null, VALUE_SCHEMA, line, 
System.currentTimeMillis()));
+
+if (records.size() >= batchSize) {
+return records;
+}
 }
 } while (line != null);
 }
diff -

[jira] [Created] (KAFKA-6428) Fail builds on findbugs warnings

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-6428:


 Summary: Fail builds on findbugs warnings
 Key: KAFKA-6428
 URL: https://issues.apache.org/jira/browse/KAFKA-6428
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


Findbugs spots likely bugs, and especially for warnings at the High level, it 
actually has pretty good signal for real bugs (or just things that might be 
risky). We should be failing builds, especially PRs, if any sufficiently high 
warnings are listed. We should get this enabled for that level and then decide 
if we want to adjust the level of warnings we want to address.

This likely relates to KAFKA-5887 since findbugs may not be sufficiently 
maintained for JDK9 support. In any case, the intent is to fail the build based 
on whichever tool is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6428) Fail builds on findbugs warnings

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314348#comment-16314348
 ] 

ASF GitHub Bot commented on KAFKA-6428:
---

ewencp opened a new pull request #4398: KAFKA-6428: Generate findbugs output 
for CI and fail builds for 'high' level warnings
URL: https://github.com/apache/kafka/pull/4398
 
 
   We already had findbugs running and it looks like sufficient warnings should 
cause errors. This PR does a few things. First, it changes to generating xml 
reports (which CI likes) by default. We already seem to have the Jenkins 
plugins setup to consume these, so we should immediately start seeing the 
output on Jenkins. This seems better than the current html default since most 
devs probably aren't looking at the html reports unless they are specifically 
looking at findbugs issues. Second, it explicitly sets the report level we want 
to trigger failures on.
   
   I think we were already failing the build based on the current settings, we 
just didn't have any high-level warnings. But this sets us up not to only fail 
the builds but also have some visibility via jenkins reports.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail builds on findbugs warnings
> 
>
> Key: KAFKA-6428
> URL: https://issues.apache.org/jira/browse/KAFKA-6428
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>
> Findbugs spots likely bugs, and especially for warnings at the High level, it 
> actually has pretty good signal for real bugs (or just things that might be 
> risky). We should be failing builds, especially PRs, if any sufficiently high 
> warnings are listed. We should get this enabled for that level and then 
> decide if we want to adjust the level of warnings we want to address.
> This likely relates to KAFKA-5887 since findbugs may not be sufficiently 
> maintained for JDK9 support. In any case, the intent is to fail the build 
> based on whichever tool is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-6252.
--
Resolution: Fixed
  Reviewer: Ewen Cheslack-Postava

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6252:
-
Fix Version/s: 1.0.1
   1.1.0

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314383#comment-16314383
 ] 

ASF GitHub Bot commented on KAFKA-6252:
---

ewencp closed pull request #4397: KAFKA-6252: Close the metric group to clean 
up any existing metrics
URL: https://github.com/apache/kafka/pull/4397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9e65cd2d80f..9b934f3428a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -247,6 +247,8 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, 
AbstractStatus.State
 ConnectMetricsRegistry registry = connectMetrics.registry();
 this.metricGroup = 
connectMetrics.group(registry.connectorGroupName(),
 registry.connectorTagName(), connName);
+// prevent collisions by removing any previously created metrics 
in this group.
+metricGroup.close();
 
 metricGroup.addImmutableValueMetric(registry.connectorType, 
connectorType());
 metricGroup.addImmutableValueMetric(registry.connectorClass, 
connector.getClass().getName());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 234ce8adf14..587e4c68cf5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -652,6 +652,8 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, 
ConnectMetrics connectMetrics) {
 metricGroup = connectMetrics
   .group(registry.sinkTaskGroupName(), 
registry.connectorTagName(), id.connector(), registry.taskTagName(),
  Integer.toString(id.task()));
+// prevent collisions by removing any previously created metrics 
in this group.
+metricGroup.close();
 
 sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
 
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new 
Rate());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9072cd47c81..a172cdb45f0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -494,6 +494,8 @@ public SourceTaskMetricsGroup(ConnectorTaskId id, 
ConnectMetrics connectMetrics)
 metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
 registry.connectorTagName(), id.connector(),
 registry.taskTagName(), Integer.toString(id.task()));
+// remove any previously created metrics in this group to prevent 
collisions.
+metricGroup.close();
 
 sourceRecordPoll = metricGroup.sensor("source-record-poll");
 
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new 
Rate());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ec069245b3d..d563f9bdede 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -313,6 +313,8 @@ public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics 
connectMetrics, TaskS
 metricGroup = connectMetrics.group(registry.taskGroupName(),
 registry.connectorTagName(), id.connector(),
 registry.taskTagName(), Integer.toString(id.task()));
+// prevent collisions by removing any previously created metrics 
in this group.
+metricGroup.close();
 
 metricGroup.addValueMetric(registry.taskStatus, new 
LiteralSupplier() {
 @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 2de7c