[jira] [Commented] (KAFKA-6507) NPE in KafkaStatusBackingStore
[ https://issues.apache.org/jira/browse/KAFKA-6507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387435#comment-16387435 ] huxihx commented on KAFKA-6507: --- [~itaycohai] Did you run into this problem when using the same versions for both broker and Connect? > NPE in KafkaStatusBackingStore > -- > > Key: KAFKA-6507 > URL: https://issues.apache.org/jira/browse/KAFKA-6507 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.1 > Environment: We are using Kafka 0.10.0.1 with Kafka connect 0.11.0.1. >Reporter: Itay Cohai >Priority: Major > > Found the following NPE in our kafka connect logs: > 2018-01-30 13:15:34,391] ERROR Unexpected exception in Thread[KafkaBasedLog > Work Thread - itay_test-connect-status,5,main] > (org.apache.kafka.connect.util.KafkaBasedLog:334) > java.lang.NullPointerException > at > org.apache.kafka.connect.storage.KafkaStatusBackingStore.read(KafkaStatusBackingStore.java:441) > at > org.apache.kafka.connect.storage.KafkaStatusBackingStore$1.onCompletion(KafkaStatusBackingStore.java:148) > at > org.apache.kafka.connect.storage.KafkaStatusBackingStore$1.onCompletion(KafkaStatusBackingStore.java:145) > at > org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:258) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327) > > If I look at the source, looks like the key comes up NULL from the status > topic, strange. > void read(ConsumerRecord record) { > String key = record.key(); > //This line --> if (key.startsWith(CONNECTOR_STATUS_PREFIX)) { > readConnectorStatus(key, record.value()); > } else if (key.startsWith(TASK_STATUS_PREFIX)) { > readTaskStatus(key, record.value()); > } else { > log.warn("Discarding record with invalid key {}", key); > } > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes
[ https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-3806. Resolution: Fixed Assignee: Ewen Cheslack-Postava > Adjust default values of log.retention.hours and offsets.retention.minutes > -- > > Key: KAFKA-3806 > URL: https://issues.apache.org/jira/browse/KAFKA-3806 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 0.9.0.1, 0.10.0.0 >Reporter: Michal Turek >Assignee: Ewen Cheslack-Postava >Priority: Minor > Fix For: 1.2.0 > > > Combination of default values of log.retention.hours (168 hours = 7 days) and > offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special > cases. Offset retention should be always greater than log retention. > We have observed the following scenario and issue: > - Producing of data to a topic was disabled two days ago by producer update, > topic wasn't deleted. > - Consumer consumed all data and properly committed offsets to Kafka. > - Consumer made no more offset commits for that topic because there was no > more incoming data and there was nothing to confirm. (We have auto-commit > disabled, I'm not sure how behaves enabled auto-commit.) > - After one day: Kafka cleared too old offsets according to > offsets.retention.minutes. > - After two days: Long-term running consumer was restarted after update, it > didn't find any committed offsets for that topic since they were deleted by > offsets.retention.minutes so it started consuming from the beginning. > - The messages were still in Kafka due to larger log.retention.hours, about 5 > days of messages were read again. > Known workaround to solve this issue: > - Explicitly configure log.retention.hours and offsets.retention.minutes, > don't use defaults. > Proposals: > - Prolong default value of offsets.retention.minutes to be at least twice > larger than log.retention.hours. > - Check these values during Kafka startup and log a warning if > offsets.retention.minutes is smaller than log.retention.hours. > - Add a note to migration guide about differences between storing of offsets > in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes
[ https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387348#comment-16387348 ] ASF GitHub Bot commented on KAFKA-3806: --- hachikuji closed pull request #4648: KAFKA-3806: Increase offsets retention default to 7 days (KIP-186) URL: https://github.com/apache/kafka/pull/4648 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8b2fb1044e0..cf22305caf7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -158,7 +158,7 @@ object Defaults { val OffsetsTopicPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions val OffsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes val OffsetsTopicCompressionCodec: Int = OffsetConfig.DefaultOffsetsTopicCompressionCodec.codec - val OffsetsRetentionMinutes: Int = 24 * 60 + val OffsetsRetentionMinutes: Int = 7 * 24 * 60 val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs val OffsetCommitTimeoutMs = OffsetConfig.DefaultOffsetCommitTimeoutMs val OffsetCommitRequiredAcks = OffsetConfig.DefaultOffsetCommitRequiredAcks diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 0793fa37a60..3e8a5359468 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -249,11 +249,11 @@ class OffsetCommitTest extends ZooKeeperTestHarness { Thread.sleep(retentionCheckInterval * 2) assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) -// v1 version commit request with commit timestamp set to now - two days +// v1 version commit request with commit timestamp set to now - seven + a bit days // committed offset should expire val commitRequest2 = OffsetCommitRequest( groupId = group, - requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", Time.SYSTEM.milliseconds - 2*24*60*60*1000L)), + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", Time.SYSTEM.milliseconds - (Defaults.OffsetsRetentionMinutes + 1) * 60 * 1000L)), versionId = 1 ) assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get) diff --git a/docs/upgrade.html b/docs/upgrade.html index 3ac293d8498..324f8df1403 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -19,6 +19,67 @@
[jira] [Commented] (KAFKA-5626) Producer should be able to negotiate ProduceRequest version with broker
[ https://issues.apache.org/jira/browse/KAFKA-5626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387325#comment-16387325 ] Dong Lin commented on KAFKA-5626: - [~ewencp] As of July 2017, when I was writing patch for KIP-112/KIP-113, producer does not negotiate ProduceRequest version with broker. This is long after KIP-97 has been implemented. So I think this is probably still relevant. I will double check the latest code. > Producer should be able to negotiate ProduceRequest version with broker > --- > > Key: KAFKA-5626 > URL: https://issues.apache.org/jira/browse/KAFKA-5626 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5626) Producer should be able to negotiate ProduceRequest version with broker
[ https://issues.apache.org/jira/browse/KAFKA-5626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387319#comment-16387319 ] Ewen Cheslack-Postava commented on KAFKA-5626: -- [~lindong] Given KIP-97, is this relevant anymore? > Producer should be able to negotiate ProduceRequest version with broker > --- > > Key: KAFKA-5626 > URL: https://issues.apache.org/jira/browse/KAFKA-5626 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5471) Original Kafka paper link broken
[ https://issues.apache.org/jira/browse/KAFKA-5471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-5471. -- Resolution: Fixed Assignee: Ewen Cheslack-Postava Updated the link to what appears to be a more permanent NetDB pdf link. > Original Kafka paper link broken > > > Key: KAFKA-5471 > URL: https://issues.apache.org/jira/browse/KAFKA-5471 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Jeremy Hanna >Assignee: Ewen Cheslack-Postava >Priority: Trivial > > Currently on the [Kafka papers and presentations > site|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations] > the original Kafka paper is linked but it's a broken link. > Currently it links to > [here|http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf] > but that person may have taken the paper down. I found it > [here|http://notes.stephenholiday.com/Kafka.pdf] but that could have a > similar problem in the future. We should be able to put the file as an > attachment in the confluence wiki to make it a more permanent link. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6309) add support for getting topic defaults from AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387310#comment-16387310 ] ASF GitHub Bot commented on KAFKA-6309: --- guozhangwang closed pull request #4624: KAFKA-6309: Improve task assignor load balance URL: https://github.com/apache/kafka/pull/4624 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index de8fa57e36a..5b54d08c032 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -20,10 +20,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -106,14 +109,13 @@ private void assignActive() { } // assign any remaining unassigned tasks -for (final TaskId taskId : unassigned) { +List sortedTasks = new ArrayList<>(unassigned); +Collections.sort(sortedTasks); +for (final TaskId taskId : sortedTasks) { allocateTaskWithClientCandidates(taskId, clients.keySet(), true); } - } - - private void allocateTaskWithClientCandidates(final TaskId taskId, final Set clientsWithin, final boolean active) { final ClientState client = findClient(taskId, clientsWithin, active); taskPairs.addPairs(taskId, client.assignedTasks()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 4f770c86239..ed22e3c30de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -23,11 +23,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -350,6 +352,42 @@ public void shouldAssignMoreTasksToClientWithMoreCapacity() { assertThat(clients.get(p1).assignedTaskCount(), equalTo(4)); } +@Test +public void shouldEvenlyDistributeByTaskIdAndPartition() { +createClient(p1, 4); +createClient(p2, 4); +createClient(p3, 4); +createClient(p4, 4); + +final List taskIds = new ArrayList<>(); +final TaskId[] taskIdArray = new TaskId[16]; + +for (int i = 1; i <= 2; i++) { +for (int j = 0; j < 8; j++) { +taskIds.add(new TaskId(i, j)); +} +} + +Collections.shuffle(taskIds); +taskIds.toArray(taskIdArray); + +final StickyTaskAssignor taskAssignor = createTaskAssignor(taskIdArray); +taskAssignor.assign(0); + +Collections.sort(taskIds); +final Set expectedClientOneAssignment = getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12); +final Set expectedClientTwoAssignment = getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13); +final Set expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14); +final Set expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15); + +final Map> sortedAssignments = sortClientAssignments(clients); + +assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment)); +assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment)); +assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment)); +assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment)); +} + @Test public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { @@ -665,4 +703,21 @@ private void assertActiveTaskTopicGroupIdsEvenlyDistributed() { } } +private Map> sortClientAssignments(final Map clie
[jira] [Commented] (KAFKA-5306) Official init.d scripts
[ https://issues.apache.org/jira/browse/KAFKA-5306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387300#comment-16387300 ] Ewen Cheslack-Postava commented on KAFKA-5306: -- I think the challenge here is that these vary from distro to distro. Some use systemd, some are still on init.d; installation paths, config files, etc would vary between distros. And some distros prefer having their own builds from src using their own conventions vs using binaries from upstream. Given the myriad distributions, I'm not sure how this would be maintainable other than a super flexible, customizable script, and even then I'm not sure it could cover differences such as systemd vs init.d. Any ideas as to how this could scale out for the variety of distros? > Official init.d scripts > --- > > Key: KAFKA-5306 > URL: https://issues.apache.org/jira/browse/KAFKA-5306 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.2.1 > Environment: Ubuntu 14.04 >Reporter: Shahar >Priority: Minor > > It would be great to have an officially supported init.d script for starting > and stopping Kafka as a service. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5199) Run Kafka JUnit build inside Docker
[ https://issues.apache.org/jira/browse/KAFKA-5199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387294#comment-16387294 ] Ewen Cheslack-Postava commented on KAFKA-5199: -- [~cmccabe] Do we still need this? I spent time quite awhile ago cleaning up the tests to not rely on fixed ports as this, as well as things like fixed /tmp locations for data, are well known causes of transient failures. This is also why ducktape avoids sharing resources as aggressively (and expensively!) as it does. Docker has definitely become more popular, but I don't think it is safe to assume every developer will have it available. While we don't emphasize testing for them, Windows developers seem the least likely to have it. Even developers on OS X may not, and anyone on a BSD that we'd otherwise expect to work would probably be unable to run tests. Is baking a Linux-specific tool into the build something we really want to do? > Run Kafka JUnit build inside Docker > --- > > Key: KAFKA-5199 > URL: https://issues.apache.org/jira/browse/KAFKA-5199 > Project: Kafka > Issue Type: Bug >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > > We should run the Kafka JUnit build inside Docker. This would avoid "port in > use" conflicts when running multiple jobs on the same node with a fixed port. > It would also help lock down the specific Linux environment and make > failures more reproducible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387293#comment-16387293 ] ASF GitHub Bot commented on KAFKA-6106: --- kamalcph closed pull request #4564: KAFKA-6106; Postpone normal processing of tasks within a thread until… URL: https://github.com/apache/kafka/pull/4564 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6545fde30e9..d61b281eed1 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) -retryRequestUntilConnected(setDataRequest) +val response = retryRequestUntilConnected(setDataRequest) +response.maybeThrow() info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } @@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d3726c25c58..e44c2c94e52 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,10 +16,11 @@ */ package kafka.zk -import java.util.{Properties, UUID} +import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.{CountDownLatch, TimeUnit} -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.log.LogConfig import kafka.security.auth._ @@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation -import org.apache.kafka.common.utils.SecurityUtils -import org.apache.zookeeper.KeeperException.NodeExistsException +import org.apache.kafka.common.utils.{SecurityUtils, Time} +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ -import org.junit.Test - +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, mutable} import scala.util.Random +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper._ +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.data.Stat + class KafkaZkClientTest extends ZooKeeperTestHarness { private val group = "my-group" + private val topic1 = "topic1" + private val topic2 = "topic2" + + val topicPartition10 = new TopicPartition(topic1, 0) + val topicPartition11 = new TopicPartition(topic1, 1) + val topicPartition20 = new TopicPartition(topic2, 0) + val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + + var otherZkClient: KafkaZkClient = _ + + @Before + override def setUp(): Unit = { +super.setUp() +otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + } + + @After + override def tearDown(): Unit = { +if (otherZkClient != null) + otherZkClient.close() +super.tearDown() + } + private val topicPartition = new TopicPartition("topic", 0) @Test @@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testTopicAssignmentMethods() { -val topic1 = "topic1" -val topic2 = "topic2" +assertTrue(zkClient.getAl
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387289#comment-16387289 ] ASF GitHub Bot commented on KAFKA-6106: --- kamalcph opened a new pull request #4651: KAFKA-6106: Postpone normal processing of tasks until restoration of all tasks completed URL: https://github.com/apache/kafka/pull/4651 Once all the state stores are restored, then the processing of tasks takes place. This approach will reduce the time taken to restore the state stores as single thread is used to restore the state and process the task. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4999) Add convenience overload for seek* methods
[ https://issues.apache.org/jira/browse/KAFKA-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-4999: - Component/s: consumer > Add convenience overload for seek* methods > -- > > Key: KAFKA-4999 > URL: https://issues.apache.org/jira/browse/KAFKA-4999 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Timo Meijer >Assignee: Richard Yu >Priority: Major > Labels: Quickfix, needs-kip > > The most common use case when using the seek* methods is to work on the > currently assigned partitions. This behavior is supported by passing an empty > list, but this is not very intuitive. > Adding an overloaded method for all seek* methods without parameters that has > the same behavior; using the currently assigned partitions, would improve the > API and user experience. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4999) Add convenience overload for seek* methods
[ https://issues.apache.org/jira/browse/KAFKA-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-4999: - Labels: Quickfix needs-kip (was: Quickfix) > Add convenience overload for seek* methods > -- > > Key: KAFKA-4999 > URL: https://issues.apache.org/jira/browse/KAFKA-4999 > Project: Kafka > Issue Type: Improvement >Reporter: Timo Meijer >Assignee: Richard Yu >Priority: Major > Labels: Quickfix, needs-kip > > The most common use case when using the seek* methods is to work on the > currently assigned partitions. This behavior is supported by passing an empty > list, but this is not very intuitive. > Adding an overloaded method for all seek* methods without parameters that has > the same behavior; using the currently assigned partitions, would improve the > API and user experience. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4999) Add convenience overload for seek* methods
[ https://issues.apache.org/jira/browse/KAFKA-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387272#comment-16387272 ] Ewen Cheslack-Postava commented on KAFKA-4999: -- Agreed the current approach isn't the most intuitive. This would need a KIP though, as we're pretty stingy about adding public API since it requires more maintenance, limits source code compatibility, and gives us less flexibility in any future changes. That said, maybe these are simple enough to be worth it. If you're interested in progressing this forward, see [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] for how to proceed; it's all pretty straightforward for something like this. > Add convenience overload for seek* methods > -- > > Key: KAFKA-4999 > URL: https://issues.apache.org/jira/browse/KAFKA-4999 > Project: Kafka > Issue Type: Improvement >Reporter: Timo Meijer >Assignee: Richard Yu >Priority: Major > Labels: Quickfix, needs-kip > > The most common use case when using the seek* methods is to work on the > currently assigned partitions. This behavior is supported by passing an empty > list, but this is not very intuitive. > Adding an overloaded method for all seek* methods without parameters that has > the same behavior; using the currently assigned partitions, would improve the > API and user experience. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4974) System test failure in 0.8.2.2 upgrade tests
[ https://issues.apache.org/jira/browse/KAFKA-4974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387269#comment-16387269 ] Ewen Cheslack-Postava commented on KAFKA-4974: -- [~edenhill] Are we still seeing similar issues? This implies there was already another service that was still running on the node with the JMX port bound (or that the port couldn't be allocated, e.g. because of X_WAIT states). We should make this more robust if possible, but if we can't reproduce consistently it may be hard to fix & test. > System test failure in 0.8.2.2 upgrade tests > > > Key: KAFKA-4974 > URL: https://issues.apache.org/jira/browse/KAFKA-4974 > Project: Kafka > Issue Type: Bug > Components: system tests >Reporter: Magnus Edenhill >Priority: Major > > The 0.10.2 system test failed in one of the upgrade tests from 0.8.2.2: > http://testing.confluent.io/confluent-kafka-0-10-2-system-test-results/?prefix=2017-03-21--001.1490092219--apache--0.10.2--4a019bd/TestUpgrade/test_upgrade/from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.snappy.new_consumer=False/ > {noformat} > [INFO - 2017-03-21 07:35:48,802 - runner_client - log - lineno:221]: > RunnerClient: > kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.snappy.new_consumer=False: > FAIL: Kafka server didn't finish startup > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py", > line 321, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/core/upgrade_test.py", > line 125, in test_upgrade > self.run_produce_consume_validate(core_test_action=lambda: > self.perform_upgrade(from_kafka_version, > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 114, in run_produce_consume_validate > core_test_action(*args) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/core/upgrade_test.py", > line 126, in > to_message_format_version)) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/core/upgrade_test.py", > line 52, in perform_upgrade > self.kafka.start_node(node) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/services/kafka/kafka.py", > line 222, in start_node > monitor.wait_until("Kafka Server.*started", timeout_sec=30, > backoff_sec=.25, err_msg="Kafka server didn't finish startup") > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/cluster/remoteaccount.py", > line 642, in wait_until > return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % > (self.offset+1, self.log, pattern), allow_fail=True) == 0, **kwargs) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > {noformat} > Logs: > {noformat} > ==> ./KafkaService-0-140646398705744/worker9/server-start-stdout-stderr.log > <== > [2017-03-21 07:35:18,250] DEBUG Leaving process event > (org.I0Itec.zkclient.ZkClient) > [2017-03-21 07:35:18,250] INFO EventThread shut down > (org.apache.zookeeper.ClientCnxn) > [2017-03-21 07:35:18,250] INFO [Kafka Server 2], shut down completed > (kafka.server.KafkaServer) > Error: Exception thrown by the agent : java.rmi.server.ExportException: Port > already in use: 9192; nested exception is: > java.net.BindException: Address already in use > {noformat} > That's from starting the upgraded broker, which seems to indicate that > 0.8.2.2 was not properly shut down or has its RMI port in the close-wait > state. > Since there probably isn't much to do about 0.8.2.2 the test should probably > be hardened to either select a random port, or wait for lingering port to > become available (can use netstat for that). > This earlier failrue from the same 0.8.2.2 invoca
[jira] [Commented] (KAFKA-5613) Deprecate JmxTool?
[ https://issues.apache.org/jira/browse/KAFKA-5613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387227#comment-16387227 ] Ewen Cheslack-Postava commented on KAFKA-5613: -- [~guozhang] Existing users is why I was suggesting deprecation rather than outright removal. I would expect at least a very healthy 2yr minimum migration path. The point is that there seem to be much better tools for collecting JMX metrics if you're actually in production. JmxTool seems to mainly be solving the problem of not requiring additional dependencies, but because the tool seems to have been included just for performance testing initially, I don't think the interface, UX, compatibility, etc were ever actually thought through. In fact, it's really unclear to me whether it ever really was public API, but due to its age and lack of public API definitions for awhile, it seems like it may have implicitly gained that status. But the cost is that we have to maintain that tool, improve it with new features (which are usually better implemented in other tools), and deal with some of the deficiencies of JMX in the system tests. Long term, do we think maintaining this tool is a good use of the Kafka project's time, or should we just encourage people to use a properly dedicated project like jmxtrans (instead of logging jmx metrics to a log file and then using something else to ship them to your monitoring tool...)? If the latter, then we should deprecate and present users an alternative asap so we don't spend time maintaining and improving the tool unnecessarily. > Deprecate JmxTool? > -- > > Key: KAFKA-5613 > URL: https://issues.apache.org/jira/browse/KAFKA-5613 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Ewen Cheslack-Postava >Priority: Major > > According to git-blame, JmxTool has been around since October 2011. We use it > in system tests, but we are thinking it might be best to replace it: > https://issues.apache.org/jira/browse/KAFKA-5612 > When making modifications for system tests, we've had to take into account > compatibility because this tool is technically included in our distribution > and, perhaps unintentionally, a public utility. > We know that "real" tools for JMX, like jmxtrans, are more commonly used, but > we don't know who might be using JmxTool simply because it ships with Kafka. > That said, it also isn't documented in the Kafka documentation, so you > probably have to dig around to find it. > Hopefully we can deprecate this and eventually move it either to a jar that > is only used for system tests, or even better just remove it entirely. To do > any of this, we'd probably need to do at least a cursory survey of the > community to get a feel for usage level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5612) Replace JmxTool with a MetricsReporter in system tests
[ https://issues.apache.org/jira/browse/KAFKA-5612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387219#comment-16387219 ] Ewen Cheslack-Postava commented on KAFKA-5612: -- First pass at this was implemented in [https://github.com/apache/kafka/pull/4072.] This adds the implementation and migrates at least one test, but doesn't complete the migration, nor removes JmxTool as an option from our system tests. I would argue the issues with JmxTool have been so numerous that we should try to remove the jmx.py file entirely in order to discourage further use, regardless of whether JmxTool continues to exist in the source tree. > Replace JmxTool with a MetricsReporter in system tests > -- > > Key: KAFKA-5612 > URL: https://issues.apache.org/jira/browse/KAFKA-5612 > Project: Kafka > Issue Type: Bug > Components: system tests >Affects Versions: 0.11.0.0 >Reporter: Ewen Cheslack-Postava >Priority: Major > Original Estimate: 120h > Remaining Estimate: 120h > > I marked this as affecting 0.11.0.0, but it affects all earlier versions as > well, at least as far back as 0.10.1. > The discussion in https://github.com/apache/kafka/pull/3547 probably gives > the clearest explanation, but the basic issue is that ever since JmxMixin was > introduced to the system tests, we've faced race condition issues because the > second process that performs the monitoring has various timing issues with > the process it is monitoring. It can be both too fast and too slow, and the > exact conditions it needs to wait for may not even be externally visible > (e.g. that all metrics have been registered). > An alternative solution would be to introduce a MetricsReporter > implementation that accomplishes the same thing, but just requires overriding > some configs for the service that is utilizing JmxMixin. In particular, the > reporter could output data to a simple file, ideally would not require all > metrics that are reported to be available up front (i.e., no CSV format that > requires a fixed header that cannot be changed), and wouldn't have any timing > constraints (e.g., could at least guarantee that metrics are reported once at > the beginning and end of the program). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread
[ https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333251#comment-16333251 ] Ted Yu edited comment on KAFKA-6303 at 3/6/18 3:04 AM: --- lgtm was (Author: yuzhih...@gmail.com): +1 > Potential lack of synchronization in NioEchoServer#AcceptorThread > - > > Key: KAFKA-6303 > URL: https://issues.apache.org/jira/browse/KAFKA-6303 > Project: Kafka > Issue Type: Bug > Components: network >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > > In the run() method: > {code} > SocketChannel socketChannel = > ((ServerSocketChannel) key.channel()).accept(); > socketChannel.configureBlocking(false); > newChannels.add(socketChannel); > {code} > Modification to newChannels should be protected by synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6613) The controller shouldn't stop partition reassignment after an exception is being thrown
chandra kasiraju created KAFKA-6613: --- Summary: The controller shouldn't stop partition reassignment after an exception is being thrown Key: KAFKA-6613 URL: https://issues.apache.org/jira/browse/KAFKA-6613 Project: Kafka Issue Type: Bug Components: admin, config, controller, core Affects Versions: 0.11.0.2 Reporter: chandra kasiraju Fix For: 0.11.0.2 I issued a partition reassignment command . It created the following entries in the zookeeper . But the entry never gets deleted because the partition reassigment hangs gets some exceptions in kafka logs . After that no matter how many hours the movement of partitions to other brokers never happens . *Path in Zookeeper* get /admin/reassign_partitions {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":44,"replicas":([1003,1001,1004,1002]},\{"topic":"683ad5e0-3775-4adc-ab55-82fda0761ba9_newTopic9","partition":0,"replicas":[1003,1004,1001,1002]},\{"topic":"683ad5e0-3775-4adc-ab55-82fda0761ba9_newTopic1","partition":0,"replicas":[1003,1004,1001,1002]},\{"topic":"__CruiseControlMetrics","partition":0,"replicas":[1002,1001,1004,1003]},\{"topic":"b1c39c85-aee5-4ea0-90a1-9fc7eedc635b_topic","partition":0,"replicas":[1003,1004,1001,1002]},\{"topic":"88ec4bd5-e149-4c98-8e8e-952e86ba5fae_topic","partition":4,"replicas":[1002,1004,1003,1001]},\{"topic":"c8c56723-73a5-4a37-93bf-b8ecaf766429_topic","partition":4,"replicas":[1002,1003,1004,1001]},\{"topic":"683ad5e0-3775-4adc-ab55-82fda0761ba9_newTopic9","partition":4,"replicas":[1002,1004,1003,1001]},\{"topic":"b1c39c85-aee5-4ea0-90a1-9fc7eedc635b_topic","partition":4,"replicas":[1003,1001,1004,1002]},\{"topic":"9db0cad2-69f8-4e85-b663-cd3987bd90fe_topic","partition":0,"replicas":[1003,1001,1004]},\{"topic":"683ad5e0-3775-4adc-ab55-82fda0761ba9_topic","partition":1,"replicas":[1003,1004,1001,1002]}]} cZxid = 0x552f8 ctime = Tue Mar 06 01:27:54 UTC 2018 mZxid = 0x55359 mtime = Tue Mar 06 01:28:06 UTC 2018 pZxid = 0x552f8 cversion = 0 dataVersion = 13 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 1114 numChildren = 0 *Exception* ERROR [KafkaApi-1002] Error when handling request \{replica_id=1005,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=__consumer_offsets,partitions=[{partition=41,fetch_offset=0,log_start_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis) kafka.common.NotAssignedReplicaException: Leader 1002 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1001,1002,1004 for partition __consumer_offsets-41. at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:274) at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1092) at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1089) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1089) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:623) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606) at kafka.server.KafkaApis.handle(KafkaApis.scala:98) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) at java.lang.Thread.run(Thread.java:745) I was expecting it would be recover from that exception move the partitions to other nodes and finally remove the entries in /admin/reassign_partitions after the move has happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool
[ https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387130#comment-16387130 ] ASF GitHub Bot commented on KAFKA-6611: --- guozhangwang opened a new pull request #4650: KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark URL: https://github.com/apache/kafka/pull/4650 1. Use JmxMixin for SimpleBenchmark (will remove the self reporting in follow-up PR). 2. Make report interval customizable. 3. Incorporates two other improves to JMXTool: https://github.com/apache/kafka/pull/1241 and https://github.com/apache/kafka/pull/2950 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Re-write simple benchmark in system tests with JMXTool > -- > > Key: KAFKA-6611 > URL: https://issues.apache.org/jira/browse/KAFKA-6611 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > The current SimpleBenchmark is recording the num.records actively in order to > compute throughput and latency, which introduces extra cost plus is less > accurate for benchmarking purposes; instead, it's better to use JmxMixin with > SimpleBenchmark to record metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup
[ https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387125#comment-16387125 ] Jun Rao commented on KAFKA-3978: Agreed. For 1) we could add an assertion. For 2), if we detect that we are truncating to -1, we could log a warn and just truncate the whole local log. > Cannot truncate to a negative offset (-1) exception at broker startup > - > > Key: KAFKA-3978 > URL: https://issues.apache.org/jira/browse/KAFKA-3978 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0 > Environment: 3.13.0-87-generic >Reporter: Juho Mäkinen >Priority: Critical > Labels: reliability, startup > > During broker startup sequence the broker server.log has this exception. > Problem persists after multiple restarts and also on another broker in the > cluster. > {code} > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser) > INFO Kafka commitId : b8642491e78c5a13 > (org.apache.kafka.common.utils.AppInfoParser) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > Error when handling request > {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT > SNIPPED AWAY..], > live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]} > (kafka.server.KafkaApis) > ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative > offset (-1). > at kafka.log.Log.truncateTo(Log.scala:731) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at
[jira] [Commented] (KAFKA-6610) initial high watermark -1, used for truncation, cause "Cannot truncate to a negative offset"
[ https://issues.apache.org/jira/browse/KAFKA-6610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387110#comment-16387110 ] huxihx commented on KAFKA-6610: --- Could you turn on the trace and find if there are any traces like 'Reading %d bytes from offset %d in log %s of length %d bytes'. I suspect that the leader high watermark was set to -1 that caused the follower HW was set to -1 as well. > initial high watermark -1, used for truncation, cause "Cannot truncate to a > negative offset" > > > Key: KAFKA-6610 > URL: https://issues.apache.org/jira/browse/KAFKA-6610 > Project: Kafka > Issue Type: Bug > Components: log, replication >Affects Versions: 0.11.0.2 >Reporter: dongyan li >Priority: Major > > Hello, > I got issue that cause the Kafka broker not in-sync with topics. When I check > the log, found one of the topic has "-1" highwatermark, then used as the > offset for truncating and later caused exception: > {quote}3/5/2018 10:32:26 AM[2018-03-05 16:32:26,576] ERROR > [ReplicaFetcherThread-0-1]: Error due to (kafka.server.ReplicaFetcherThread) > 3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a > negative offset (-1). > 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreach(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) > 3/5/2018 10:32:26 AM at > kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:133) > 3/5/2018 10:32:26 AM at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > 3/5/2018 10:32:26 AM at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > 3/5/2018 10:32:26 AM at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > 3/5/2018 10:32:26 AM at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > 3/5/2018 10:32:26 AM at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > {quote} > I deleted the log directory of that topic but topic partition got re-created > after restart, with same error after. I'm wondering how the `highwatermark` > get set to -1. I failed to trace the code flow from the `Replica` > constructor. Thank you. > > Dongyan -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup
[ https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387076#comment-16387076 ] Jason Gustafson commented on KAFKA-3978: [~ijuma] [~junrao] We're still not sure the cause of this, but it should be easy to either 1) add an assertion when writing the checkpoint that offsets are not negative, and 2) detect the problem at startup and just use the log start offset. Since this problem has persisted for some time, shall we go ahead and do either of these? > Cannot truncate to a negative offset (-1) exception at broker startup > - > > Key: KAFKA-3978 > URL: https://issues.apache.org/jira/browse/KAFKA-3978 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0 > Environment: 3.13.0-87-generic >Reporter: Juho Mäkinen >Priority: Critical > Labels: reliability, startup > > During broker startup sequence the broker server.log has this exception. > Problem persists after multiple restarts and also on another broker in the > cluster. > {code} > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser) > INFO Kafka commitId : b8642491e78c5a13 > (org.apache.kafka.common.utils.AppInfoParser) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > Error when handling request > {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT > SNIPPED AWAY..], > live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]} > (kafka.server.KafkaApis) > ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative > offset (-1). > at kafka.log.Log.truncateTo(Log.scala:731) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288) > at
[jira] [Resolved] (KAFKA-6610) initial high watermark -1, used for truncation, cause "Cannot truncate to a negative offset"
[ https://issues.apache.org/jira/browse/KAFKA-6610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6610. Resolution: Duplicate Thanks for the report. This looks like KAFKA-3978. We are still not sure the cause of this, but if you have any additional details, please add them to that JIRA. > initial high watermark -1, used for truncation, cause "Cannot truncate to a > negative offset" > > > Key: KAFKA-6610 > URL: https://issues.apache.org/jira/browse/KAFKA-6610 > Project: Kafka > Issue Type: Bug > Components: log, replication >Affects Versions: 0.11.0.2 >Reporter: dongyan li >Priority: Major > > Hello, > I got issue that cause the Kafka broker not in-sync with topics. When I check > the log, found one of the topic has "-1" highwatermark, then used as the > offset for truncating and later caused exception: > {quote}3/5/2018 10:32:26 AM[2018-03-05 16:32:26,576] ERROR > [ReplicaFetcherThread-0-1]: Error due to (kafka.server.ReplicaFetcherThread) > 3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a > negative offset (-1). > 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreach(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) > 3/5/2018 10:32:26 AM at > kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:133) > 3/5/2018 10:32:26 AM at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > 3/5/2018 10:32:26 AM at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > 3/5/2018 10:32:26 AM at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > 3/5/2018 10:32:26 AM at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > 3/5/2018 10:32:26 AM at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > {quote} > I deleted the log directory of that topic but topic partition got re-created > after restart, with same error after. I'm wondering how the `highwatermark` > get set to -1. I failed to trace the code flow from the `Replica` > constructor. Thank you. > > Dongyan -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386999#comment-16386999 ] ASF GitHub Bot commented on KAFKA-6612: --- gitlw opened a new pull request #4649: KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion URL: https://github.com/apache/kafka/pull/4649 This patch adds logic in handling the PartitionModifications event, so that if new partitions are increased when topic deletion is still in progress, the controller will restore the data of the path /brokers/topics/ to remove the added partitions. Testing done: Added a new test method to cover the bug However "gradle testAll" is failing since "gradle :streams:compileJava" is failing because of changes unrelated to this patch. Therefore "gradle testAll" should be triggered again when the compilation errors are resolved. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Added logic to prevent increasing partition counts during topic deletion > > > Key: KAFKA-6612 > URL: https://issues.apache.org/jira/browse/KAFKA-6612 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > Problem: trying to increase the partition count of a topic while the topic > deletion is in progress can cause the topic to be never deleted. > In the current code base, if a topic deletion is still in progress and the > partition count is increased, > the new partition and its replica assignment be created on zookeeper as data > of the path /brokers/topics/. > Upon detecting the change, the controller sees the topic is being deleted, > and therefore ignores the partition change. Therefore the zk path > /brokers/topics//partitions/ will NOT be created. > If a controller switch happens next, the added partition will be detected by > the new controller and stored in the > controllerContext.partitionReplicaAssignment. The new controller then tries > to delete the topic by first transitioning its replicas to OfflineReplica. > However the transition to OfflineReplica state will NOT succeed since there > is no leader for the partition. Since the only state change path for a > replica to be successfully deleted is OfflineReplica -> > ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter > the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-6612: - Assignee: Lucas Wang > Added logic to prevent increasing partition counts during topic deletion > > > Key: KAFKA-6612 > URL: https://issues.apache.org/jira/browse/KAFKA-6612 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > Problem: trying to increase the partition count of a topic while the topic > deletion is in progress can cause the topic to be never deleted. > In the current code base, if a topic deletion is still in progress and the > partition count is increased, > the new partition and its replica assignment be created on zookeeper as data > of the path /brokers/topics/. > Upon detecting the change, the controller sees the topic is being deleted, > and therefore ignores the partition change. Therefore the zk path > /brokers/topics//partitions/ will NOT be created. > If a controller switch happens next, the added partition will be detected by > the new controller and stored in the > controllerContext.partitionReplicaAssignment. The new controller then tries > to delete the topic by first transitioning its replicas to OfflineReplica. > However the transition to OfflineReplica state will NOT succeed since there > is no leader for the partition. Since the only state change path for a > replica to be successfully deleted is OfflineReplica -> > ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter > the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
Lucas Wang created KAFKA-6612: - Summary: Added logic to prevent increasing partition counts during topic deletion Key: KAFKA-6612 URL: https://issues.apache.org/jira/browse/KAFKA-6612 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Problem: trying to increase the partition count of a topic while the topic deletion is in progress can cause the topic to be never deleted. In the current code base, if a topic deletion is still in progress and the partition count is increased, the new partition and its replica assignment be created on zookeeper as data of the path /brokers/topics/. Upon detecting the change, the controller sees the topic is being deleted, and therefore ignores the partition change. Therefore the zk path /brokers/topics//partitions/ will NOT be created. If a controller switch happens next, the added partition will be detected by the new controller and stored in the controllerContext.partitionReplicaAssignment. The new controller then tries to delete the topic by first transitioning its replicas to OfflineReplica. However the transition to OfflineReplica state will NOT succeed since there is no leader for the partition. Since the only state change path for a replica to be successfully deleted is OfflineReplica -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool
Guozhang Wang created KAFKA-6611: Summary: Re-write simple benchmark in system tests with JMXTool Key: KAFKA-6611 URL: https://issues.apache.org/jira/browse/KAFKA-6611 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Assignee: Guozhang Wang The current SimpleBenchmark is recording the num.records actively in order to compute throughput and latency, which introduces extra cost plus is less accurate for benchmarking purposes; instead, it's better to use JmxMixin with SimpleBenchmark to record metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5613) Deprecate JmxTool?
[ https://issues.apache.org/jira/browse/KAFKA-5613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386951#comment-16386951 ] Guozhang Wang commented on KAFKA-5613: -- I agree with KAFKA-5612 that for system tests we should consider using a different tool that is race condition free; however deprecating it may not be a good idea since AFAIK lots of users are using JMXTools for their monitoring purposes; and its popularity may even be comparable to other OS eco-systems like Kafka Manager and Kafka Monitor. Another use case that I can think of is to use JMXTool for some benchmarking. > Deprecate JmxTool? > -- > > Key: KAFKA-5613 > URL: https://issues.apache.org/jira/browse/KAFKA-5613 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Ewen Cheslack-Postava >Priority: Major > > According to git-blame, JmxTool has been around since October 2011. We use it > in system tests, but we are thinking it might be best to replace it: > https://issues.apache.org/jira/browse/KAFKA-5612 > When making modifications for system tests, we've had to take into account > compatibility because this tool is technically included in our distribution > and, perhaps unintentionally, a public utility. > We know that "real" tools for JMX, like jmxtrans, are more commonly used, but > we don't know who might be using JmxTool simply because it ships with Kafka. > That said, it also isn't documented in the Kafka documentation, so you > probably have to dig around to find it. > Hopefully we can deprecate this and eventually move it either to a jar that > is only used for system tests, or even better just remove it entirely. To do > any of this, we'd probably need to do at least a cursory survey of the > community to get a feel for usage level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL
[ https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-6049: - Assignee: Richard Yu > Kafka Streams: Add Cogroup in the DSL > - > > Key: KAFKA-6049 > URL: https://issues.apache.org/jira/browse/KAFKA-6049 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Assignee: Richard Yu >Priority: Major > Labels: api, kip, user-experience > > When multiple streams aggregate together to form a single larger object (e.g. > a shopping website may have a cart stream, a wish list stream, and a > purchases stream. Together they make up a Customer), it is very difficult to > accommodate this in the Kafka-Streams DSL: it generally requires you to group > and aggregate all of the streams to KTables then make multiple outer join > calls to end up with a KTable with your desired object. This will create a > state store for each stream and a long chain of ValueJoiners that each new > record must go through to get to the final object. > Creating a cogroup method where you use a single state store will: > * Reduce the number of gets from state stores. With the multiple joins when a > new value comes into any of the streams a chain reaction happens where the > join processor keep calling ValueGetters until we have accessed all state > stores. > * Slight performance increase. As described above all ValueGetters are called > also causing all ValueJoiners to be called forcing a recalculation of the > current joined value of all other streams, impacting performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-6610) initial high watermark -1, used for truncation, cause "Cannot truncate to a negative offset"
[ https://issues.apache.org/jira/browse/KAFKA-6610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongyan li updated KAFKA-6610: -- Comment: was deleted (was: Timeline: {quote}3/5/2018 11:28:23 AM[2018-03-05 17:28:23,084] INFO Replica loaded for TOPICNAME-0 with initial high watermark 0 (kafka.cluster.Replica) 3/5/2018 11:28:23 AM[2018-03-05 17:28:23,086] INFO Replica loaded for partition TOPICNAME-0 with initial high watermark -1 (kafka.cluster.Replica) 3/5/2018 11:28:23 AM[2018-03-05 17:28:23,086] INFO Replica loaded for partition TOPICNAME-0 with initial high watermark 0 (kafka.cluster.Replica){quote} Later: {quote}3/5/2018 11:28:23 AM[2018-03-05 17:28:23,305] WARN [ReplicaFetcherThread-0-1]: Based on follower's leader epoch, leader replied with an unknown offset in cmdty_ifl_inbound_fix-0. High watermark -1 will be used for truncation. (kafka.server.ReplicaFetcherThread){quote} Then, error: (timestamp is not right, but the content is the same) {quote}3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a negative offset (-1). 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreach(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) 3/5/2018 10:32:26 AM at kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:133) 3/5/2018 10:32:26 AM at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) 3/5/2018 10:32:26 AM at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) 3/5/2018 10:32:26 AM at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){quote} ) > initial high watermark -1, used for truncation, cause "Cannot truncate to a > negative offset" > > > Key: KAFKA-6610 > URL: https://issues.apache.org/jira/browse/KAFKA-6610 > Project: Kafka > Issue Type: Bug > Components: log, replication >Affects Versions: 0.11.0.2 >Reporter: dongyan li >Priority: Major > > Hello, > I got issue that cause the Kafka broker not in-sync with topics. When I check > the log, found one of the topic has "-1" highwatermark, then used as the > offset for truncating and later caused exception: > {quote}3/5/2018 10:32:26 AM[2018-03-05 16:32:26,576] ERROR > [ReplicaFetcherThread-0-1]: Error due to (kafka.server.ReplicaFetcherThread) > 3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a > negative offset (-1). > 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreach(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) > 3/5/2018 10:32:2
[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes
[ https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386623#comment-16386623 ] ASF GitHub Bot commented on KAFKA-3806: --- ewencp opened a new pull request #4648: KAFKA-3806: Increase offsets retention default to 7 days (KIP-186) URL: https://github.com/apache/kafka/pull/4648 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Adjust default values of log.retention.hours and offsets.retention.minutes > -- > > Key: KAFKA-3806 > URL: https://issues.apache.org/jira/browse/KAFKA-3806 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 0.9.0.1, 0.10.0.0 >Reporter: Michal Turek >Priority: Minor > Fix For: 1.2.0 > > > Combination of default values of log.retention.hours (168 hours = 7 days) and > offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special > cases. Offset retention should be always greater than log retention. > We have observed the following scenario and issue: > - Producing of data to a topic was disabled two days ago by producer update, > topic wasn't deleted. > - Consumer consumed all data and properly committed offsets to Kafka. > - Consumer made no more offset commits for that topic because there was no > more incoming data and there was nothing to confirm. (We have auto-commit > disabled, I'm not sure how behaves enabled auto-commit.) > - After one day: Kafka cleared too old offsets according to > offsets.retention.minutes. > - After two days: Long-term running consumer was restarted after update, it > didn't find any committed offsets for that topic since they were deleted by > offsets.retention.minutes so it started consuming from the beginning. > - The messages were still in Kafka due to larger log.retention.hours, about 5 > days of messages were read again. > Known workaround to solve this issue: > - Explicitly configure log.retention.hours and offsets.retention.minutes, > don't use defaults. > Proposals: > - Prolong default value of offsets.retention.minutes to be at least twice > larger than log.retention.hours. > - Check these values during Kafka startup and log a warning if > offsets.retention.minutes is smaller than log.retention.hours. > - Add a note to migration guide about differences between storing of offsets > in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386548#comment-16386548 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4630: KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path URL: https://github.com/apache/kafka/pull/4630 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 6b3626101bd..47becfc239b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -226,11 +226,11 @@ public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface."; -/** {@code default timestamp.extractor} */ +/** {@code default.timestamp.extractor} */ public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface."; -/** {@code default value.serde} */ +/** {@code default.value.serde} */ public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde"; private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface."; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 9aa0e94c8c1..71a84b2ca73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -66,7 +66,8 @@ public final TaskId taskId; public final TopicPartition partition; -AssignedPartition(final TaskId taskId, final TopicPartition partition) { +AssignedPartition(final TaskId taskId, + final TopicPartition partition) { this.taskId = taskId; this.partition = partition; } @@ -77,11 +78,11 @@ public int compareTo(final AssignedPartition that) { } @Override -public boolean equals(Object o) { +public boolean equals(final Object o) { if (!(o instanceof AssignedPartition)) { return false; } -AssignedPartition other = (AssignedPartition) o; +final AssignedPartition other = (AssignedPartition) o; return compareTo(other) == 0; } @@ -104,8 +105,9 @@ public int hashCode() { final String host = getHost(endPoint); final Integer port = getPort(endPoint); -if (host == null || port == null) +if (host == null || port == null) { throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint)); +} hostInfo = new HostInfo(host, port); } else { @@ -119,10 +121,11 @@ public int hashCode() { state = new ClientState(); } -void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { +void addConsumer(final String consumerMemberId, + final SubscriptionInfo info) { consumers.add(consumerMemberId); -state.addPreviousActiveTasks(info.prevTasks); -state.addPreviousStandbyTasks(info.standbyTasks); +state.addPreviousActiveTasks(info.prevTasks()); +state.addPreviousStandbyTasks(info.standbyTasks()); state.incrementCapacity(); } @@ -157,8 +160,9 @@ public String toString() { private static final Comparator PARTITION_COMPARATOR = new Comparator() { @Override -public int compare(TopicPartition p1, TopicPartition p2) { -int result = p1.topic().compareTo(p2.topic()); +public int compare(final TopicPartition p1, + final TopicPartition p2) { +final int result = p1.topic().compareTo(p2.to
[jira] [Updated] (KAFKA-6610) initial high watermark -1, used for truncation, cause "Cannot truncate to a negative offset"
[ https://issues.apache.org/jira/browse/KAFKA-6610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongyan li updated KAFKA-6610: -- Description: Hello, I got issue that cause the Kafka broker not in-sync with topics. When I check the log, found one of the topic has "-1" highwatermark, then used as the offset for truncating and later caused exception: {quote}3/5/2018 10:32:26 AM[2018-03-05 16:32:26,576] ERROR [ReplicaFetcherThread-0-1]: Error due to (kafka.server.ReplicaFetcherThread) 3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a negative offset (-1). 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreach(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) 3/5/2018 10:32:26 AM at kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:133) 3/5/2018 10:32:26 AM at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) 3/5/2018 10:32:26 AM at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) 3/5/2018 10:32:26 AM at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) {quote} I deleted the log directory of that topic but topic partition got re-created after restart, with same error after. I'm wondering how the `highwatermark` get set to -1. I failed to trace the code flow from the `Replica` constructor. Thank you. Dongyan was: Hello, I got issue that cause the Kafka broker not in-sync with topics. When I check the log, found one of the topic has "-1" highwatermark, then used as the offset for truncating and later caused exception: {quote}3/5/2018 10:32:26 AM[2018-03-05 16:32:26,576] ERROR [ReplicaFetcherThread-0-1]: Error due to (kafka.server.ReplicaFetcherThread) 3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a negative offset (-1). 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreach(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) 3/5/2018 10:32:26 AM at kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:133) 3/5/2018 10:32:26 AM at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) 3/5/2018 10:32:26 AM at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) 3/5/2018 10:32:26 AM at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) {quote} I deleted the log directory of that topic but topic partition got re-created after restart, with same error after. Any help with how can I got out of this error? Thank you. Dongyan > ini
[jira] [Updated] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL
[ https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6049: --- Labels: api kip user-experience (was: api needs-kip user-experience) > Kafka Streams: Add Cogroup in the DSL > - > > Key: KAFKA-6049 > URL: https://issues.apache.org/jira/browse/KAFKA-6049 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: api, kip, user-experience > > When multiple streams aggregate together to form a single larger object (e.g. > a shopping website may have a cart stream, a wish list stream, and a > purchases stream. Together they make up a Customer), it is very difficult to > accommodate this in the Kafka-Streams DSL: it generally requires you to group > and aggregate all of the streams to KTables then make multiple outer join > calls to end up with a KTable with your desired object. This will create a > state store for each stream and a long chain of ValueJoiners that each new > record must go through to get to the final object. > Creating a cogroup method where you use a single state store will: > * Reduce the number of gets from state stores. With the multiple joins when a > new value comes into any of the streams a chain reaction happens where the > join processor keep calling ValueGetters until we have accessed all state > stores. > * Slight performance increase. As described above all ValueGetters are called > also causing all ValueJoiners to be called forcing a recalculation of the > current joined value of all other streams, impacting performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6610) initial high watermark -1, used for truncation, cause "Cannot truncate to a negative offset"
[ https://issues.apache.org/jira/browse/KAFKA-6610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386448#comment-16386448 ] dongyan li commented on KAFKA-6610: --- Timeline: {quote}3/5/2018 11:28:23 AM[2018-03-05 17:28:23,084] INFO Replica loaded for TOPICNAME-0 with initial high watermark 0 (kafka.cluster.Replica) 3/5/2018 11:28:23 AM[2018-03-05 17:28:23,086] INFO Replica loaded for partition TOPICNAME-0 with initial high watermark -1 (kafka.cluster.Replica) 3/5/2018 11:28:23 AM[2018-03-05 17:28:23,086] INFO Replica loaded for partition TOPICNAME-0 with initial high watermark 0 (kafka.cluster.Replica){quote} Later: {quote}3/5/2018 11:28:23 AM[2018-03-05 17:28:23,305] WARN [ReplicaFetcherThread-0-1]: Based on follower's leader epoch, leader replied with an unknown offset in cmdty_ifl_inbound_fix-0. High watermark -1 will be used for truncation. (kafka.server.ReplicaFetcherThread){quote} Then, error: (timestamp is not right, but the content is the same) {quote}3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a negative offset (-1). 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreach(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) 3/5/2018 10:32:26 AM at kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:133) 3/5/2018 10:32:26 AM at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) 3/5/2018 10:32:26 AM at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) 3/5/2018 10:32:26 AM at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){quote} > initial high watermark -1, used for truncation, cause "Cannot truncate to a > negative offset" > > > Key: KAFKA-6610 > URL: https://issues.apache.org/jira/browse/KAFKA-6610 > Project: Kafka > Issue Type: Bug > Components: log, replication >Affects Versions: 0.11.0.2 >Reporter: dongyan li >Priority: Major > > Hello, > I got issue that cause the Kafka broker not in-sync with topics. When I check > the log, found one of the topic has "-1" highwatermark, then used as the > offset for truncating and later caused exception: > {quote}3/5/2018 10:32:26 AM[2018-03-05 16:32:26,576] ERROR > [ReplicaFetcherThread-0-1]: Error due to (kafka.server.ReplicaFetcherThread) > 3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a > negative offset (-1). > 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) > 3/5/2018 10:32:26 AM at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > 3/5/2018 10:32:26 AM at > scala.collection.mutable.HashMap.foreach(HashMap.scala:138) > 3/5/2018 10:32:26 AM at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) > 3/5/2018 10:32:26 AM at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) > 3/5/2
[jira] [Created] (KAFKA-6610) initial high watermark -1, used for truncation, cause "Cannot truncate to a negative offset"
dongyan li created KAFKA-6610: - Summary: initial high watermark -1, used for truncation, cause "Cannot truncate to a negative offset" Key: KAFKA-6610 URL: https://issues.apache.org/jira/browse/KAFKA-6610 Project: Kafka Issue Type: Bug Components: log, replication Affects Versions: 0.11.0.2 Reporter: dongyan li Hello, I got issue that cause the Kafka broker not in-sync with topics. When I check the log, found one of the topic has "-1" highwatermark, then used as the offset for truncating and later caused exception: {quote}3/5/2018 10:32:26 AM[2018-03-05 16:32:26,576] ERROR [ReplicaFetcherThread-0-1]: Error due to (kafka.server.ReplicaFetcherThread) 3/5/2018 10:32:26 AMjava.lang.IllegalArgumentException: Cannot truncate to a negative offset (-1). 3/5/2018 10:32:26 AM at kafka.log.Log.truncateTo(Log.scala:1377) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:330) 3/5/2018 10:32:26 AM at kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:321) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 3/5/2018 10:32:26 AM at scala.collection.mutable.HashMap.foreach(HashMap.scala:138) 3/5/2018 10:32:26 AM at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) 3/5/2018 10:32:26 AM at kafka.log.LogManager.truncateTo(LogManager.scala:321) 3/5/2018 10:32:26 AM at kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:279) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:133) 3/5/2018 10:32:26 AM at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) 3/5/2018 10:32:26 AM at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) 3/5/2018 10:32:26 AM at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) 3/5/2018 10:32:26 AM at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) {quote} I deleted the log directory of that topic but topic partition got re-created after restart, with same error after. Any help with how can I got out of this error? Thank you. Dongyan -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4514) Add Codec for ZStandard Compression
[ https://issues.apache.org/jira/browse/KAFKA-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386327#comment-16386327 ] Bert Sanders commented on KAFKA-4514: - Cloudflare got great results with this codec for Kafka : [https://blog.cloudflare.com/squeezing-the-firehose/|https://blog.cloudflare.com/squeezing-the-firehose/] > Add Codec for ZStandard Compression > --- > > Key: KAFKA-4514 > URL: https://issues.apache.org/jira/browse/KAFKA-4514 > Project: Kafka > Issue Type: Improvement > Components: compression >Reporter: Thomas Graves >Assignee: Lee Dongjin >Priority: Major > > ZStandard: https://github.com/facebook/zstd and > http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was > recently released. Hadoop > (https://issues.apache.org/jira/browse/HADOOP-13578) and others are adopting > it. > We have done some initial trials and seen good results. Zstd seems to give > great results => Gzip level Compression with Lz4 level CPU. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6589) Extract Heartbeat thread from Abstract Coordinator for testability purposes
[ https://issues.apache.org/jira/browse/KAFKA-6589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386174#comment-16386174 ] ASF GitHub Bot commented on KAFKA-6589: --- smurakozi opened a new pull request #4647: [WIP] KAFKA-6589 Extract Heartbeat thread from AbstractCoordinator URL: https://github.com/apache/kafka/pull/4647 AbstractCoordinator interacts with the thread via a HeartbeatThreadManager. The existing behavior was extracted to an abstract class + some glue code in the AbstractCoordinator. Testing was done using the existing ConsumerCoordinatorTest and WorkerCoordinatorTest. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract Heartbeat thread from Abstract Coordinator for testability purposes > --- > > Key: KAFKA-6589 > URL: https://issues.apache.org/jira/browse/KAFKA-6589 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Assignee: Sandor Murakozi >Priority: Minor > > Today we do not have an easy way to instrument the heartbeat thread in our > unit test, e.g. to inject a GC or make it crash etc since it is a private > class inside AbstractCoordinator. > It is better to extract this class out and also enabling AbstractCoordinator > to take a heartbeat-thread interface that can be mocked for test utils. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2610) Metrics for SSL handshake
[ https://issues.apache.org/jira/browse/KAFKA-2610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386115#comment-16386115 ] Andy Feller commented on KAFKA-2610: Will bump thread as this is beyond nice to have as I was involved in a 3 week endeavor with hundreds of consumers and producers whose SSL certificates expired in waves caused a Kafka cluster to effectively fall over and there was no way from Kafka side to easily identify issue and run down offenders. It eventually required enabling debug level logging for the following classes to determine the issue with the following stack trace: System information: * Apache Kafka 2.10-0.10.0.1 * Ubuntu 14 LTS Log4j changes: * org.apache.kafka.common.network.SslTransportLayer * org.apache.kafka.common.network.Selector Kafka debug stack trace to identify expired certificates: {noformat} [2018-02-12 15:21:29,675] DEBUG Connection with ###.##.##.##/###.##.##.## disconnected (org.apache.kafka.common.network.Selector) javax.net.ssl.SSLHandshakeException: General SSLEngine problem at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1529) at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535) at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214) at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186) at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469) at org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:378) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:243) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:62) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318) at org.apache.kafka.common.network.Selector.poll(Selector.java:283) at kafka.network.Processor.poll(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:412) at java.lang.Thread.run(Thread.java:748) Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:330) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:322) at sun.security.ssl.ServerHandshaker.clientCertificate(ServerHandshaker.java:1979) at sun.security.ssl.ServerHandshaker.processMessage(ServerHandshaker.java:237) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1052) at sun.security.ssl.Handshaker$1.run(Handshaker.java:992) at sun.security.ssl.Handshaker$1.run(Handshaker.java:989) at java.security.AccessController.doPrivileged(Native Method) at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1467) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:414) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:270) ... 6 more Caused by: sun.security.validator.ValidatorException: PKIX path validation failed: java.security.cert.CertPathValidatorException: validity check failed at sun.security.validator.PKIXValidator.doValidate(PKIXValidator.java:362) at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:270) at sun.security.validator.Validator.validate(Validator.java:260) at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324) at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:279) at sun.security.ssl.X509TrustManagerImpl.checkClientTrusted(X509TrustManagerImpl.java:130) at sun.security.ssl.ServerHandshaker.clientCertificate(ServerHandshaker.java:1966) ... 15 more Caused by: java.security.cert.CertPathValidatorException: validity check failed at sun.security.provider.certpath.PKIXMasterCertPathValidator.validate(PKIXMasterCertPathValidator.java:135) at sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:223) at sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:140) at sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(PKIXCertPathValidator.java:79) at java.security.cert.CertPathValidator.validate(CertPathValidator.java:292) at sun.security.validator.PKIXValidator.doValidate(PKIXValidator.java:357) ... 21 more Caused by: java.security.cert.CertificateExpiredException: NotAfter: Sun Jan 28 06:08:00 EST 2018 at sun.security.x509.Certifi
[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed
[ https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386016#comment-16386016 ] Simon S commented on KAFKA-6188: Are you sure that this is Windows-specific? I ran into this exact issue running Kafka 1.0.0 on CentOS Core 7.3.1611. > Broker fails with FATAL Shutdown - log dirs have failed > --- > > Key: KAFKA-6188 > URL: https://issues.apache.org/jira/browse/KAFKA-6188 > Project: Kafka > Issue Type: Bug > Components: clients, log >Affects Versions: 1.0.0 > Environment: Windows 10 >Reporter: Valentina Baljak >Priority: Blocker > Labels: windows > > Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The > test environment is very simple, with only one producer and one consumer. > Initially, everything started fine, stand alone tests worked as expected. > However, running my code, Kafka clients fail after approximately 10 minutes. > Kafka won't start after that and it fails with the same error. > Deleting logs helps to start again, and the same problem occurs. > Here is the error traceback: > [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 > ms. (kafka.log.LogManager) > [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of > 9223372036854775807 ms. (kafka.log.LogManager) > [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. > (kafka.network.Acceptor) > [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor > threads (kafka.network.SocketServer) > [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting > (kafka.server.ReplicaManager$LogDirFailureHandler) > [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving > replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs > (kafka.server.ReplicaManager) > [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions are > offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs > (kafka.server.ReplicaManager) > [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed > fetcher for partitions (kafka.server.ReplicaFetcherManager) > [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped > fetcher for partitions because they are in the failed log dir > C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager) > [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir > C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager) > [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in > C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager) -- This message was sent by Atlassian JIRA (v7.6.3#76005)