[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314383#comment-16314383
 ] 

ASF GitHub Bot commented on KAFKA-6252:
---

ewencp closed pull request #4397: KAFKA-6252: Close the metric group to clean 
up any existing metrics
URL: https://github.com/apache/kafka/pull/4397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9e65cd2d80f..9b934f3428a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -247,6 +247,8 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, 
AbstractStatus.State
 ConnectMetricsRegistry registry = connectMetrics.registry();
 this.metricGroup = 
connectMetrics.group(registry.connectorGroupName(),
 registry.connectorTagName(), connName);
+// prevent collisions by removing any previously created metrics 
in this group.
+metricGroup.close();
 
 metricGroup.addImmutableValueMetric(registry.connectorType, 
connectorType());
 metricGroup.addImmutableValueMetric(registry.connectorClass, 
connector.getClass().getName());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 234ce8adf14..587e4c68cf5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -652,6 +652,8 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, 
ConnectMetrics connectMetrics) {
 metricGroup = connectMetrics
   .group(registry.sinkTaskGroupName(), 
registry.connectorTagName(), id.connector(), registry.taskTagName(),
  Integer.toString(id.task()));
+// prevent collisions by removing any previously created metrics 
in this group.
+metricGroup.close();
 
 sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
 
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new 
Rate());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9072cd47c81..a172cdb45f0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -494,6 +494,8 @@ public SourceTaskMetricsGroup(ConnectorTaskId id, 
ConnectMetrics connectMetrics)
 metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
 registry.connectorTagName(), id.connector(),
 registry.taskTagName(), Integer.toString(id.task()));
+// remove any previously created metrics in this group to prevent 
collisions.
+metricGroup.close();
 
 sourceRecordPoll = metricGroup.sensor("source-record-poll");
 
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new 
Rate());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ec069245b3d..d563f9bdede 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -313,6 +313,8 @@ public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics 
connectMetrics, TaskS
 metricGroup = connectMetrics.group(registry.taskGroupName(),
 registry.connectorTagName(), id.connector(),
 registry.taskTagName(), Integer.toString(id.task()));
+// prevent collisions by removing any previously created metrics 
in this group.
+metricGroup.close();
 
 metricGroup.addValueMetric(registry.taskStatus, new 
LiteralSupplier() {
 @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 

[jira] [Resolved] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-6252.
--
Resolution: Fixed
  Reviewer: Ewen Cheslack-Postava

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6252:
-
Fix Version/s: 1.0.1
   1.1.0

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6428) Fail builds on findbugs warnings

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314348#comment-16314348
 ] 

ASF GitHub Bot commented on KAFKA-6428:
---

ewencp opened a new pull request #4398: KAFKA-6428: Generate findbugs output 
for CI and fail builds for 'high' level warnings
URL: https://github.com/apache/kafka/pull/4398
 
 
   We already had findbugs running and it looks like sufficient warnings should 
cause errors. This PR does a few things. First, it changes to generating xml 
reports (which CI likes) by default. We already seem to have the Jenkins 
plugins setup to consume these, so we should immediately start seeing the 
output on Jenkins. This seems better than the current html default since most 
devs probably aren't looking at the html reports unless they are specifically 
looking at findbugs issues. Second, it explicitly sets the report level we want 
to trigger failures on.
   
   I think we were already failing the build based on the current settings, we 
just didn't have any high-level warnings. But this sets us up not to only fail 
the builds but also have some visibility via jenkins reports.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail builds on findbugs warnings
> 
>
> Key: KAFKA-6428
> URL: https://issues.apache.org/jira/browse/KAFKA-6428
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>
> Findbugs spots likely bugs, and especially for warnings at the High level, it 
> actually has pretty good signal for real bugs (or just things that might be 
> risky). We should be failing builds, especially PRs, if any sufficiently high 
> warnings are listed. We should get this enabled for that level and then 
> decide if we want to adjust the level of warnings we want to address.
> This likely relates to KAFKA-5887 since findbugs may not be sufficiently 
> maintained for JDK9 support. In any case, the intent is to fail the build 
> based on whichever tool is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6428) Fail builds on findbugs warnings

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-6428:


 Summary: Fail builds on findbugs warnings
 Key: KAFKA-6428
 URL: https://issues.apache.org/jira/browse/KAFKA-6428
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


Findbugs spots likely bugs, and especially for warnings at the High level, it 
actually has pretty good signal for real bugs (or just things that might be 
risky). We should be failing builds, especially PRs, if any sufficiently high 
warnings are listed. We should get this enabled for that level and then decide 
if we want to adjust the level of warnings we want to address.

This likely relates to KAFKA-5887 since findbugs may not be sufficiently 
maintained for JDK9 support. In any case, the intent is to fail the build based 
on whichever tool is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4335) FileStreamSource Connector not working for large files (~ 1GB)

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314316#comment-16314316
 ] 

ASF GitHub Bot commented on KAFKA-4335:
---

ewencp closed pull request #4356: KAFKA-4335: Add batch.size to 
FileStreamSource connector to prevent OOM
URL: https://github.com/apache/kafka/pull/4356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 335fe925519..59006dae4f4 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -36,13 +36,18 @@
 public class FileStreamSourceConnector extends SourceConnector {
 public static final String TOPIC_CONFIG = "topic";
 public static final String FILE_CONFIG = "file";
+public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
+
+public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
 private static final ConfigDef CONFIG_DEF = new ConfigDef()
 .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source 
filename. If not specified, the standard input will be used")
-.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to 
publish data to");
+.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to 
publish data to")
+.define(TASK_BATCH_SIZE_CONFIG, Type.INT, Importance.LOW, "The maximum 
number of records the Source task can read from file one time");
 
 private String filename;
 private String topic;
+private int batchSize = DEFAULT_TASK_BATCH_SIZE;
 
 @Override
 public String version() {
@@ -57,6 +62,14 @@ public void start(Map props) {
 throw new ConnectException("FileStreamSourceConnector 
configuration must include 'topic' setting");
 if (topic.contains(","))
 throw new ConnectException("FileStreamSourceConnector should only 
have a single topic when used as a source.");
+
+if (props.containsKey(TASK_BATCH_SIZE_CONFIG)) {
+try {
+batchSize = 
Integer.parseInt(props.get(TASK_BATCH_SIZE_CONFIG));
+} catch (NumberFormatException e) {
+throw new ConnectException("Invalid FileStreamSourceConnector 
configuration", e);
+}
+}
 }
 
 @Override
@@ -72,6 +85,7 @@ public void start(Map props) {
 if (filename != null)
 config.put(FILE_CONFIG, filename);
 config.put(TOPIC_CONFIG, topic);
+config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));
 configs.add(config);
 return configs;
 }
diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 8edf385611a..482102f7859 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -50,6 +50,7 @@
 private char[] buffer = new char[1024];
 private int offset = 0;
 private String topic = null;
+private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
 
 private Long streamOffset;
 
@@ -70,6 +71,14 @@ public void start(Map props) {
 topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
 if (topic == null)
 throw new ConnectException("FileStreamSourceTask config missing 
topic setting");
+
+if 
(props.containsKey(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)) {
+try {
+batchSize = 
Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
+} catch (NumberFormatException e) {
+throw new ConnectException("Invalid FileStreamSourceTask 
configuration", e);
+}
+}
 }
 
 @Override
@@ -146,6 +155,10 @@ public void start(Map props) {
 records = new ArrayList<>();
 records.add(new SourceRecord(offsetKey(filename), 
offsetValue(streamOffset), topic, null,
 null, null, VALUE_SCHEMA, line, 
System.currentTimeMillis()));
+
+if (records.size() >= batchSize) {
+return records;
+}
 }
 } 

[jira] [Resolved] (KAFKA-4335) FileStreamSource Connector not working for large files (~ 1GB)

2018-01-05 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-4335.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4356
[https://github.com/apache/kafka/pull/4356]

> FileStreamSource Connector not working for large files (~ 1GB)
> --
>
> Key: KAFKA-4335
> URL: https://issues.apache.org/jira/browse/KAFKA-4335
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Rahul Shukla
> Fix For: 1.1.0
>
>
> I was trying to sink large file about (1gb). FileStreamSource connector is 
> not working for that it's working fine for small files.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4834) Kafka cannot delete topic with ReplicaStateMachine went wrong

2018-01-05 Thread Kiran Pillarisetty (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314230#comment-16314230
 ] 

Kiran Pillarisetty commented on KAFKA-4834:
---

We are experiencing the same issue on 0.11.0.1.

> Kafka cannot delete topic with ReplicaStateMachine went wrong
> -
>
> Key: KAFKA-4834
> URL: https://issues.apache.org/jira/browse/KAFKA-4834
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.1.1
>Reporter: Dan
>  Labels: reliability
>
> It happened several times that some topics can not be deleted in our 
> production environment. By analyzing the log, we found ReplicaStateMachine 
> went wrong. Here are the error messages:
> In state-change.log:
> ERROR Controller 2 epoch 201 initiated state change of replica 1 for 
> partition [test_create_topic1,1] from OnlineReplica to ReplicaDeletionStarted 
> failed (state.change.logger)
> java.lang.AssertionError: assertion failed: Replica 
> [Topic=test_create_topic1,Partition=1,Replica=1] should be in the 
> OfflineReplica states before moving to ReplicaDeletionStarted state. Instead 
> it is in OnlineReplica state
> at scala.Predef$.assert(Predef.scala:179)
> at 
> kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> In controller.log:
> INFO Leader not yet assigned for partition [test_create_topic1,1]. Skip 
> sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch)
> There may exist two controllers in the cluster because creating a new topic 
> may trigger two machines to change the state of same partition, eg. 
> NonExistentPartition -> NewPartition.
> On the other controller, we found following messages in controller.log of 
> several days earlier:
> [2017-02-25 16:51:22,353] INFO [Topic Deletion Manager 0], Topic 

[jira] [Commented] (KAFKA-6314) Add a tool to delete kafka based consumer offsets for a given group

2018-01-05 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314204#comment-16314204
 ] 

Ivan Babrou commented on KAFKA-6314:


Is there a workaround that allows universal alerting for lagging consumers?

> Add a tool to delete kafka based consumer offsets for a given group
> ---
>
> Key: KAFKA-6314
> URL: https://issues.apache.org/jira/browse/KAFKA-6314
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer, core, tools
>Reporter: Tom Scott
>Priority: Minor
>
> Add a tool to delete kafka based consumer offsets for a given group similar 
> to the reset tool. It could look something like this:
> kafka-consumer-groups --bootstrap-server localhost:9092 --delete-offsets 
> --group somegroup
> The case for this is as follows:
> 1. Consumer group with id: group1 subscribes to topic1
> 2. The group is stopped 
> 3. The subscription changed to topic2 but the id is kept as group1
> Now the out output of kafka-consumer-groups --describe for the group will 
> show topic1 even though the group is not subscribed to that topic. This is 
> bad for monitoring as it will show lag on topic1.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314079#comment-16314079
 ] 

ASF GitHub Bot commented on KAFKA-6252:
---

wicknicks opened a new pull request #4397: KAFKA-6252: Close the metric group 
to clean up any existing metrics
URL: https://github.com/apache/kafka/pull/4397
 
 
   Signed-off-by: Arjun Satish 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Assignee: Arjun Satish
>Priority: Critical
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6427) Inconsistent exception type from KafkaConsumer.position

2018-01-05 Thread Jay Kahrman (JIRA)
Jay Kahrman created KAFKA-6427:
--

 Summary: Inconsistent exception type from KafkaConsumer.position
 Key: KAFKA-6427
 URL: https://issues.apache.org/jira/browse/KAFKA-6427
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Jay Kahrman
Priority: Trivial


If KafkaConsumer.position is called with a partition that the consumer isn't 
assigned, it throws an IllegalArgumentException. All other APIs throw an 
IllegalStateException when the consumer tries to act on a partition that is not 
assigned to the consumer. 

Looking at the implementation, if it weren't for subscription test and 
IllegalArgumentException thrown at the beginning of KafkaConsumer.position, the 
very next line would throw an IllegalStateException anyway.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6311) Expose Kafka cluster ID in Connect REST API

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313341#comment-16313341
 ] 

ASF GitHub Bot commented on KAFKA-6311:
---

hachikuji closed pull request #4314: KAFKA-6311: Expose Kafka cluster ID in 
Connect REST API (KIP-238)
URL: https://github.com/apache/kafka/pull/4314
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 11fe428b731..49ad8d02872 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,13 +38,41 @@
 import java.util.concurrent.TimeUnit;
 
 public class MockAdminClient extends AdminClient {
+public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+
 private final List brokers;
 private final Map allTopics = new HashMap<>();
+private final String clusterId;
 
+private Node controller;
 private int timeoutNextRequests = 0;
 
-public MockAdminClient(List brokers) {
+/**
+ * Creates MockAdminClient for a cluster with the given brokers. The Kafka 
cluster ID uses the default value from
+ * DEFAULT_CLUSTER_ID.
+ *
+ * @param brokers list of brokers in the cluster
+ * @param controller node that should start as the controller
+ */
+public MockAdminClient(List brokers, Node controller) {
+this(brokers, controller, DEFAULT_CLUSTER_ID);
+}
+
+/**
+ * Creates MockAdminClient for a cluster with the given brokers.
+ * @param brokers list of brokers in the cluster
+ * @param controller node that should start as the controller
+ */
+public MockAdminClient(List brokers, Node controller, String 
clusterId) {
 this.brokers = brokers;
+controller(controller);
+this.clusterId = clusterId;
+}
+
+public void controller(Node controller) {
+if (!brokers.contains(controller))
+throw new IllegalArgumentException("The controller node must be in 
the list of brokers");
+this.controller = controller;
 }
 
 public void addTopic(boolean internal,
@@ -82,7 +110,22 @@ public void timeoutNextRequest(int numberOfRequest) {
 
 @Override
 public DescribeClusterResult describeCluster(DescribeClusterOptions 
options) {
-throw new UnsupportedOperationException("Not implemented yet");
+KafkaFutureImpl nodesFuture = new 
KafkaFutureImpl<>();
+KafkaFutureImpl controllerFuture = new KafkaFutureImpl<>();
+KafkaFutureImpl brokerIdFuture = new KafkaFutureImpl<>();
+
+if (timeoutNextRequests > 0) {
+nodesFuture.completeExceptionally(new TimeoutException());
+controllerFuture.completeExceptionally(new TimeoutException());
+brokerIdFuture.completeExceptionally(new TimeoutException());
+--timeoutNextRequests;
+} else {
+nodesFuture.complete(brokers);
+controllerFuture.complete(controller);
+brokerIdFuture.complete(clusterId);
+}
+
+return new DescribeClusterResult(nodesFuture, controllerFuture, 
brokerIdFuture);
 }
 
 @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 1b2f94e4613..98a77ed06c9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,8 @@ public static void main(String[] args) throws Exception {
 plugins.compareAndSwapWithDelegatingLoader();
 DistributedConfig config = new DistributedConfig(workerProps);
 
+String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
 RestServer rest = new RestServer(config);
 URI advertisedUrl = rest.advertisedUrl();
 String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
@@ -85,7 +88,8 @@ public static void main(String[] args) throws Exception {
 
 ConfigBackingStore configBackingStore = new