[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314383#comment-16314383 ] ASF GitHub Bot commented on KAFKA-6252: --- ewencp closed pull request #4397: KAFKA-6252: Close the metric group to clean up any existing metrics URL: https://github.com/apache/kafka/pull/4397 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 9e65cd2d80f..9b934f3428a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -247,6 +247,8 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State ConnectMetricsRegistry registry = connectMetrics.registry(); this.metricGroup = connectMetrics.group(registry.connectorGroupName(), registry.connectorTagName(), connName); +// prevent collisions by removing any previously created metrics in this group. +metricGroup.close(); metricGroup.addImmutableValueMetric(registry.connectorType, connectorType()); metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 234ce8adf14..587e4c68cf5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -652,6 +652,8 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) { metricGroup = connectMetrics .group(registry.sinkTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task())); +// prevent collisions by removing any previously created metrics in this group. +metricGroup.close(); sinkRecordRead = metricGroup.metrics().sensor("sink-record-read"); sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 9072cd47c81..a172cdb45f0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -494,6 +494,8 @@ public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) metricGroup = connectMetrics.group(registry.sourceTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task())); +// remove any previously created metrics in this group to prevent collisions. +metricGroup.close(); sourceRecordPoll = metricGroup.sensor("source-record-poll"); sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index ec069245b3d..d563f9bdede 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -313,6 +313,8 @@ public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskS metricGroup = connectMetrics.group(registry.taskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task())); +// prevent collisions by removing any previously created metrics in this group. +metricGroup.close(); metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier() { @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java index
[jira] [Resolved] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6252. -- Resolution: Fixed Reviewer: Ewen Cheslack-Postava > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Assignee: Arjun Satish >Priority: Critical > Fix For: 1.1.0, 1.0.1 > > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6252: - Fix Version/s: 1.0.1 1.1.0 > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Assignee: Arjun Satish >Priority: Critical > Fix For: 1.1.0, 1.0.1 > > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6428) Fail builds on findbugs warnings
[ https://issues.apache.org/jira/browse/KAFKA-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314348#comment-16314348 ] ASF GitHub Bot commented on KAFKA-6428: --- ewencp opened a new pull request #4398: KAFKA-6428: Generate findbugs output for CI and fail builds for 'high' level warnings URL: https://github.com/apache/kafka/pull/4398 We already had findbugs running and it looks like sufficient warnings should cause errors. This PR does a few things. First, it changes to generating xml reports (which CI likes) by default. We already seem to have the Jenkins plugins setup to consume these, so we should immediately start seeing the output on Jenkins. This seems better than the current html default since most devs probably aren't looking at the html reports unless they are specifically looking at findbugs issues. Second, it explicitly sets the report level we want to trigger failures on. I think we were already failing the build based on the current settings, we just didn't have any high-level warnings. But this sets us up not to only fail the builds but also have some visibility via jenkins reports. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fail builds on findbugs warnings > > > Key: KAFKA-6428 > URL: https://issues.apache.org/jira/browse/KAFKA-6428 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > > Findbugs spots likely bugs, and especially for warnings at the High level, it > actually has pretty good signal for real bugs (or just things that might be > risky). We should be failing builds, especially PRs, if any sufficiently high > warnings are listed. We should get this enabled for that level and then > decide if we want to adjust the level of warnings we want to address. > This likely relates to KAFKA-5887 since findbugs may not be sufficiently > maintained for JDK9 support. In any case, the intent is to fail the build > based on whichever tool is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6428) Fail builds on findbugs warnings
Ewen Cheslack-Postava created KAFKA-6428: Summary: Fail builds on findbugs warnings Key: KAFKA-6428 URL: https://issues.apache.org/jira/browse/KAFKA-6428 Project: Kafka Issue Type: Improvement Components: build Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Findbugs spots likely bugs, and especially for warnings at the High level, it actually has pretty good signal for real bugs (or just things that might be risky). We should be failing builds, especially PRs, if any sufficiently high warnings are listed. We should get this enabled for that level and then decide if we want to adjust the level of warnings we want to address. This likely relates to KAFKA-5887 since findbugs may not be sufficiently maintained for JDK9 support. In any case, the intent is to fail the build based on whichever tool is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4335) FileStreamSource Connector not working for large files (~ 1GB)
[ https://issues.apache.org/jira/browse/KAFKA-4335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314316#comment-16314316 ] ASF GitHub Bot commented on KAFKA-4335: --- ewencp closed pull request #4356: KAFKA-4335: Add batch.size to FileStreamSource connector to prevent OOM URL: https://github.com/apache/kafka/pull/4356 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index 335fe925519..59006dae4f4 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -36,13 +36,18 @@ public class FileStreamSourceConnector extends SourceConnector { public static final String TOPIC_CONFIG = "topic"; public static final String FILE_CONFIG = "file"; +public static final String TASK_BATCH_SIZE_CONFIG = "batch.size"; + +public static final int DEFAULT_TASK_BATCH_SIZE = 2000; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used") -.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to"); +.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to") +.define(TASK_BATCH_SIZE_CONFIG, Type.INT, Importance.LOW, "The maximum number of records the Source task can read from file one time"); private String filename; private String topic; +private int batchSize = DEFAULT_TASK_BATCH_SIZE; @Override public String version() { @@ -57,6 +62,14 @@ public void start(Mapprops) { throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting"); if (topic.contains(",")) throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source."); + +if (props.containsKey(TASK_BATCH_SIZE_CONFIG)) { +try { +batchSize = Integer.parseInt(props.get(TASK_BATCH_SIZE_CONFIG)); +} catch (NumberFormatException e) { +throw new ConnectException("Invalid FileStreamSourceConnector configuration", e); +} +} } @Override @@ -72,6 +85,7 @@ public void start(Map props) { if (filename != null) config.put(FILE_CONFIG, filename); config.put(TOPIC_CONFIG, topic); +config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize)); configs.add(config); return configs; } diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java index 8edf385611a..482102f7859 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java @@ -50,6 +50,7 @@ private char[] buffer = new char[1024]; private int offset = 0; private String topic = null; +private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE; private Long streamOffset; @@ -70,6 +71,14 @@ public void start(Map props) { topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG); if (topic == null) throw new ConnectException("FileStreamSourceTask config missing topic setting"); + +if (props.containsKey(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)) { +try { +batchSize = Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)); +} catch (NumberFormatException e) { +throw new ConnectException("Invalid FileStreamSourceTask configuration", e); +} +} } @Override @@ -146,6 +155,10 @@ public void start(Map props) { records = new ArrayList<>(); records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, null, null, null, VALUE_SCHEMA, line, System.currentTimeMillis())); + +if (records.size() >= batchSize) { +return records; +} } }
[jira] [Resolved] (KAFKA-4335) FileStreamSource Connector not working for large files (~ 1GB)
[ https://issues.apache.org/jira/browse/KAFKA-4335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-4335. -- Resolution: Fixed Fix Version/s: 1.1.0 Issue resolved by pull request 4356 [https://github.com/apache/kafka/pull/4356] > FileStreamSource Connector not working for large files (~ 1GB) > -- > > Key: KAFKA-4335 > URL: https://issues.apache.org/jira/browse/KAFKA-4335 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Rahul Shukla > Fix For: 1.1.0 > > > I was trying to sink large file about (1gb). FileStreamSource connector is > not working for that it's working fine for small files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4834) Kafka cannot delete topic with ReplicaStateMachine went wrong
[ https://issues.apache.org/jira/browse/KAFKA-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314230#comment-16314230 ] Kiran Pillarisetty commented on KAFKA-4834: --- We are experiencing the same issue on 0.11.0.1. > Kafka cannot delete topic with ReplicaStateMachine went wrong > - > > Key: KAFKA-4834 > URL: https://issues.apache.org/jira/browse/KAFKA-4834 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.10.1.1 >Reporter: Dan > Labels: reliability > > It happened several times that some topics can not be deleted in our > production environment. By analyzing the log, we found ReplicaStateMachine > went wrong. Here are the error messages: > In state-change.log: > ERROR Controller 2 epoch 201 initiated state change of replica 1 for > partition [test_create_topic1,1] from OnlineReplica to ReplicaDeletionStarted > failed (state.change.logger) > java.lang.AssertionError: assertion failed: Replica > [Topic=test_create_topic1,Partition=1,Replica=1] should be in the > OfflineReplica states before moving to ReplicaDeletionStarted state. Instead > it is in OnlineReplica state > at scala.Predef$.assert(Predef.scala:179) > at > kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309) > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > In controller.log: > INFO Leader not yet assigned for partition [test_create_topic1,1]. Skip > sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch) > There may exist two controllers in the cluster because creating a new topic > may trigger two machines to change the state of same partition, eg. > NonExistentPartition -> NewPartition. > On the other controller, we found following messages in controller.log of > several days earlier: > [2017-02-25 16:51:22,353] INFO [Topic Deletion Manager 0], Topic
[jira] [Commented] (KAFKA-6314) Add a tool to delete kafka based consumer offsets for a given group
[ https://issues.apache.org/jira/browse/KAFKA-6314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314204#comment-16314204 ] Ivan Babrou commented on KAFKA-6314: Is there a workaround that allows universal alerting for lagging consumers? > Add a tool to delete kafka based consumer offsets for a given group > --- > > Key: KAFKA-6314 > URL: https://issues.apache.org/jira/browse/KAFKA-6314 > Project: Kafka > Issue Type: New Feature > Components: consumer, core, tools >Reporter: Tom Scott >Priority: Minor > > Add a tool to delete kafka based consumer offsets for a given group similar > to the reset tool. It could look something like this: > kafka-consumer-groups --bootstrap-server localhost:9092 --delete-offsets > --group somegroup > The case for this is as follows: > 1. Consumer group with id: group1 subscribes to topic1 > 2. The group is stopped > 3. The subscription changed to topic2 but the id is kept as group1 > Now the out output of kafka-consumer-groups --describe for the group will > show topic1 even though the group is not subscribed to that topic. This is > bad for monitoring as it will show lag on topic1. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314079#comment-16314079 ] ASF GitHub Bot commented on KAFKA-6252: --- wicknicks opened a new pull request #4397: KAFKA-6252: Close the metric group to clean up any existing metrics URL: https://github.com/apache/kafka/pull/4397 Signed-off-by: Arjun Satish*More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Assignee: Arjun Satish >Priority: Critical > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6427) Inconsistent exception type from KafkaConsumer.position
Jay Kahrman created KAFKA-6427: -- Summary: Inconsistent exception type from KafkaConsumer.position Key: KAFKA-6427 URL: https://issues.apache.org/jira/browse/KAFKA-6427 Project: Kafka Issue Type: Bug Components: consumer Reporter: Jay Kahrman Priority: Trivial If KafkaConsumer.position is called with a partition that the consumer isn't assigned, it throws an IllegalArgumentException. All other APIs throw an IllegalStateException when the consumer tries to act on a partition that is not assigned to the consumer. Looking at the implementation, if it weren't for subscription test and IllegalArgumentException thrown at the beginning of KafkaConsumer.position, the very next line would throw an IllegalStateException anyway. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6311) Expose Kafka cluster ID in Connect REST API
[ https://issues.apache.org/jira/browse/KAFKA-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313341#comment-16313341 ] ASF GitHub Bot commented on KAFKA-6311: --- hachikuji closed pull request #4314: KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) URL: https://github.com/apache/kafka/pull/4314 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 11fe428b731..49ad8d02872 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -38,13 +38,41 @@ import java.util.concurrent.TimeUnit; public class MockAdminClient extends AdminClient { +public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; + private final List brokers; private final MapallTopics = new HashMap<>(); +private final String clusterId; +private Node controller; private int timeoutNextRequests = 0; -public MockAdminClient(List brokers) { +/** + * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from + * DEFAULT_CLUSTER_ID. + * + * @param brokers list of brokers in the cluster + * @param controller node that should start as the controller + */ +public MockAdminClient(List brokers, Node controller) { +this(brokers, controller, DEFAULT_CLUSTER_ID); +} + +/** + * Creates MockAdminClient for a cluster with the given brokers. + * @param brokers list of brokers in the cluster + * @param controller node that should start as the controller + */ +public MockAdminClient(List brokers, Node controller, String clusterId) { this.brokers = brokers; +controller(controller); +this.clusterId = clusterId; +} + +public void controller(Node controller) { +if (!brokers.contains(controller)) +throw new IllegalArgumentException("The controller node must be in the list of brokers"); +this.controller = controller; } public void addTopic(boolean internal, @@ -82,7 +110,22 @@ public void timeoutNextRequest(int numberOfRequest) { @Override public DescribeClusterResult describeCluster(DescribeClusterOptions options) { -throw new UnsupportedOperationException("Not implemented yet"); +KafkaFutureImpl nodesFuture = new KafkaFutureImpl<>(); +KafkaFutureImpl controllerFuture = new KafkaFutureImpl<>(); +KafkaFutureImpl brokerIdFuture = new KafkaFutureImpl<>(); + +if (timeoutNextRequests > 0) { +nodesFuture.completeExceptionally(new TimeoutException()); +controllerFuture.completeExceptionally(new TimeoutException()); +brokerIdFuture.completeExceptionally(new TimeoutException()); +--timeoutNextRequests; +} else { +nodesFuture.complete(brokers); +controllerFuture.complete(controller); +brokerIdFuture.complete(clusterId); +} + +return new DescribeClusterResult(nodesFuture, controllerFuture, brokerIdFuture); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 1b2f94e4613..98a77ed06c9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -31,6 +31,7 @@ import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; import org.apache.kafka.connect.storage.KafkaStatusBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +72,8 @@ public static void main(String[] args) throws Exception { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig config = new DistributedConfig(workerProps); +String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + RestServer rest = new RestServer(config); URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); @@ -85,7 +88,8 @@ public static void main(String[] args) throws Exception { ConfigBackingStore configBackingStore = new