[jira] [Created] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available
Bob Barrett created KAFKA-15817: --- Summary: Avoid reconnecting to the same IP address if multiple addresses are available Key: KAFKA-15817 URL: https://issues.apache.org/jira/browse/KAFKA-15817 Project: Kafka Issue Type: Bug Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.3.2 Reporter: Bob Barrett In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS resolution behavior for clients to re-resolve DNS after disconnecting from a broker, rather than wait until we iterated over all addresses from a given resolution. This is useful when the IP addresses have changed between the connection and disconnection. However, with the behavior change, this does mean that clients could potentially reconnect immediately to the same IP they just disconnected from, if the IPs have not changed. In cases where the disconnection happened because that IP was unhealthy (such as a case where a load balancer has instances in multiple availability zones and one zone is unhealthy, or a case where an intermediate component in the network path is going through a rolling restart), this will delay the client successfully reconnecting. To address this, clients should remember the IP they just disconnected from and skip that IP when reconnecting, as long as the address resolved to multiple addresses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-12193) Re-resolve IPs when a client is disconnected
Bob Barrett created KAFKA-12193: --- Summary: Re-resolve IPs when a client is disconnected Key: KAFKA-12193 URL: https://issues.apache.org/jira/browse/KAFKA-12193 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.7.0 Reporter: Bob Barrett If `client.dns.lookup` is set to `use_all_dns_ips` or `resolve_canonical_bootstrap_servers_only`, the NetworkClient can store multiple IPs for each node, and currently it tries each IP in the list when connecting before re-resolving the IPs. This is useful when first establishing a connection because it ensures that the client exhausts all possible IPs. However, in the case where the IPs changed after a connection was already established, this would cause a reconnecting client to try several invalid IPs before re-resolving and trying a valid one. Instead, we should re-resolve DNS when a client disconnects from an established connection, rather than assuming the all previously-resolved IPs are still valid. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10860) JmxTool fails with NPE when object-name contains a wildcard
Bob Barrett created KAFKA-10860: --- Summary: JmxTool fails with NPE when object-name contains a wildcard Key: KAFKA-10860 URL: https://issues.apache.org/jira/browse/KAFKA-10860 Project: Kafka Issue Type: Bug Reporter: Bob Barrett When running JmxTool with a wildcard in the object name, the tool fails with a NullPointerException: {code:java} bin/kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi. Exception in thread "main" java.lang.NullPointerException at kafka.tools.JmxTool$.main(JmxTool.scala:194) at kafka.tools.JmxTool.main(JmxTool.scala) {code} It seems that we never populate the `names` variable when the object name includes a pattern: {code:java} var names: Iterable[ObjectName] = null def namesSet = Option(names).toSet.flatten def foundAllObjects = queries.toSet == namesSet val waitTimeoutMs = 1 if (!hasPatternQueries) { val start = System.currentTimeMillis do { if (names != null) { System.err.println("Could not find all object names, retrying") Thread.sleep(100) } names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala) } while (wait && System.currentTimeMillis - start < waitTimeoutMs && !foundAllObjects) } if (wait && !foundAllObjects) { val missing = (queries.toSet - namesSet).mkString(", ") System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing") System.err.println("Exiting.") sys.exit(1) } val numExpectedAttributes: Map[ObjectName, Int] = if (!attributesWhitelistExists) names.map{name: ObjectName => val mbean = mbsc.getMBeanInfo(name) (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap else { if (!hasPatternQueries) names.map{name: ObjectName => val mbean = mbsc.getMBeanInfo(name) val attributes = mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)) val expectedAttributes = attributes.asScala.asInstanceOf[mutable.Buffer[Attribute]] .filter(attr => attributesWhitelist.get.contains(attr.getName)) (name, expectedAttributes.size)}.toMap.filter(_._2 > 0) else queries.map((_, attributesWhitelist.get.length)).toMap } {code} We need to add logic to query the object names that match the pattern when a pattern is part of the input. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10282) Log metrics are removed if a log is deleted and re-created quickly enough
[ https://issues.apache.org/jira/browse/KAFKA-10282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Barrett resolved KAFKA-10282. - Resolution: Fixed > Log metrics are removed if a log is deleted and re-created quickly enough > - > > Key: KAFKA-10282 > URL: https://issues.apache.org/jira/browse/KAFKA-10282 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.6.0 >Reporter: Bob Barrett >Assignee: Bob Barrett >Priority: Major > Fix For: 2.7.0, 2.6.1 > > > When we delete a local log, we mark it for asynchronous deletion by renaming > it with a `.delete` extension, and then wait `LogConfig.FileDeleteDelayMs` > milliseconds before actually deleting the files on disk. We don't remove the > Log metrics from the metrics registry until the actual deletion takes place. > If we recreate a log of the same topic partition (for example, if we reassign > the partition away from the broker and quickly reassign it back), the metrics > are registered when the new log is created, but then unregistered when the > async deletion of the original log takes place. This leaves us with a > partition that is not reporting any Log metrics (size, offsets, number of > segments, etc). > To fix this, the LogManager should check when creating new logs to see if a > log for the same topic partition is marked for deletion, and if so, signal to > that log not to unregister its metrics when it is deleted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10320) Log metrics for future logs never have the is-future tag removed
Bob Barrett created KAFKA-10320: --- Summary: Log metrics for future logs never have the is-future tag removed Key: KAFKA-10320 URL: https://issues.apache.org/jira/browse/KAFKA-10320 Project: Kafka Issue Type: Bug Reporter: Bob Barrett -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10282) Log metrics are removed if a log is deleted and re-created quickly enough
Bob Barrett created KAFKA-10282: --- Summary: Log metrics are removed if a log is deleted and re-created quickly enough Key: KAFKA-10282 URL: https://issues.apache.org/jira/browse/KAFKA-10282 Project: Kafka Issue Type: Bug Components: log Affects Versions: 2.6.0 Reporter: Bob Barrett Assignee: Bob Barrett Fix For: 2.7.0, 2.6.1 When we delete a local log, we mark it for asynchronous deletion by renaming it with a `.delete` extension, and then wait `LogConfig.FileDeleteDelayMs` milliseconds before actually deleting the files on disk. We don't remove the Log metrics from the metrics registry until the actual deletion takes place. If we recreate a log of the same topic partition (for example, if we reassign the partition away from the broker and quickly reassign it back), the metrics are registered when the new log is created, but then unregistered when the async deletion of the original log takes place. This leaves us with a partition that is not reporting any Log metrics (size, offsets, number of segments, etc). To fix this, the LogManager should check when creating new logs to see if a log for the same topic partition is marked for deletion, and if so, signal to that log not to unregister its metrics when it is deleted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10197) Elect preferred leader when completing a partition reassignment
Bob Barrett created KAFKA-10197: --- Summary: Elect preferred leader when completing a partition reassignment Key: KAFKA-10197 URL: https://issues.apache.org/jira/browse/KAFKA-10197 Project: Kafka Issue Type: Improvement Reporter: Bob Barrett Currently, when completing a partition reassignment, we elect a leader from the new replica assignment without requiring that the leader be the new preferred leader. Instead, we just choose any in-sync replica: {code:java} private def leaderForReassign(partition: TopicPartition, leaderAndIsr: LeaderAndIsr, controllerContext: ControllerContext): ElectionResult = { val targetReplicas = controllerContext.partitionFullReplicaAssignment(partition).targetReplicaAssignment.replicas val liveReplicas = targetReplicas.filter(replica => controllerContext.isReplicaOnline(replica, partition)) val isr = leaderAndIsr.isr val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(targetReplicas, isr, liveReplicas.toSet) val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader, isUnclean = false)) ElectionResult(partition, newLeaderAndIsrOpt, targetReplicas) } {code} If auto preferred leader election is enabled, the preferred leader will eventually be elected. However, it would make sense to choose the preferred leader after the reassignment without waiting for the automatic trigger. {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10059) KafkaAdminClient returns null OffsetAndMetadata value when there is no committed offset for a partition
Bob Barrett created KAFKA-10059: --- Summary: KafkaAdminClient returns null OffsetAndMetadata value when there is no committed offset for a partition Key: KAFKA-10059 URL: https://issues.apache.org/jira/browse/KAFKA-10059 Project: Kafka Issue Type: Improvement Reporter: Bob Barrett When listing consumer group offsets through the admin client, the map that we return has a null `OffsetAndMetadata` value if the partition has no committed offset. It would be better to return a non-null value that indicates the lack of an offset, such as `OffsetAndMetadata(-1, Optional.empty(), metadata)`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9811) TransactionMetadata state and pendingState are non-volatile and read outside the metadata lock
Bob Barrett created KAFKA-9811: -- Summary: TransactionMetadata state and pendingState are non-volatile and read outside the metadata lock Key: KAFKA-9811 URL: https://issues.apache.org/jira/browse/KAFKA-9811 Project: Kafka Issue Type: Bug Reporter: Bob Barrett As an example, in TransactionStateManager.timedOutTransactions(), we read the state and pendingState without acquiring the lock for each metadata object: {code:java} inReadLock(stateLock) { transactionMetadataCache.flatMap { case (_, entry) => entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => if (txnMetadata.pendingTransitionInProgress) { false } else { txnMetadata.state match { case Ongoing => txnMetadata.txnStartTimestamp + txnMetadata.txnTimeoutMs < now case _ => false } } }.map { case (txnId, txnMetadata) => TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId, txnMetadata.producerEpoch) } } }{code} The start timestamp is volatile, so it can be safely read, but we also read the pendingState and state, which are not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9788) Sensor name collision for group and transaction coordinator load metrics
Bob Barrett created KAFKA-9788: -- Summary: Sensor name collision for group and transaction coordinator load metrics Key: KAFKA-9788 URL: https://issues.apache.org/jira/browse/KAFKA-9788 Project: Kafka Issue Type: Bug Reporter: Bob Barrett Assignee: Bob Barrett Both the group coordinator and the transaction coordinator create a Sensor object on startup to track the time it takes to load partitions, and both name the Sensor "PartitionLoadTime": [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L98] [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L92] However, Sensor names are meant to be unique. This name collision means that there is actually only one underlying "PartitionLoadTime" Sensor per broker, which is marked for each partition loaded by either coordinator, resulting in the metrics for group and transaction partition loading to be identical, and based the combination of each data set. These should be renamed to allow distinguishing between the two coordinator types. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9750) Flaky test kafka.server.ReplicaManagerTest.testFencedErrorCausedByBecomeLeader
Bob Barrett created KAFKA-9750: -- Summary: Flaky test kafka.server.ReplicaManagerTest.testFencedErrorCausedByBecomeLeader Key: KAFKA-9750 URL: https://issues.apache.org/jira/browse/KAFKA-9750 Project: Kafka Issue Type: Bug Components: core Reporter: Bob Barrett When running tests locally, I've seen that 1-2% of the time, testFencedErrorCausedByBecomeLeader fails with {code:java} org.scalatest.exceptions.TestFailedException: the partition=test-topic-0 should be removed from pending stateorg.scalatest.exceptions.TestFailedException: the partition=test-topic-0 should be removed from pending state at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.server.ReplicaManagerTest.testFencedErrorCausedByBecomeLeader(ReplicaManagerTest.scala:248) at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:40) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9749) TransactionMarkerRequestCompletionHandler should treat storage exceptions as retriable
Bob Barrett created KAFKA-9749: -- Summary: TransactionMarkerRequestCompletionHandler should treat storage exceptions as retriable Key: KAFKA-9749 URL: https://issues.apache.org/jira/browse/KAFKA-9749 Project: Kafka Issue Type: Bug Reporter: Bob Barrett If `TransactionMarkerRequestCompletionHandler` handles a `KafkaStorageException`, it throws an IllegalStateException rather than retrying. This leaves the corresponding transactional ID in state PendingAbort, where it gets stuck, because any further EndTxn or InitProducerId call will fail with a CONCURRENT_TRANSACTIONS error. We should retry these errors when writing transaction markers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9276) Unclear warning due to empty throttled fetch response
Bob Barrett created KAFKA-9276: -- Summary: Unclear warning due to empty throttled fetch response Key: KAFKA-9276 URL: https://issues.apache.org/jira/browse/KAFKA-9276 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.3.0 Reporter: Bob Barrett With a version 1.0.0 consumer talking to a 2.3.0 broker, the following WARN-level statement is logged when a request is throttled: {code:java} Ignoring fetch response containing partitions [] since it does not match the requested partitions [topic-29, topic-25, topic-21, topic-17, topic-33]{code} It appears that the 1.0.0 consumer expects fetch data for all partitions, but the throttled response is empty, causing an unclear warning message. This may be a regression from KIP-219, which changed the throttling behavior to return the empty fetch response. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9105) Truncate producer state when incrementing log start offset
Bob Barrett created KAFKA-9105: -- Summary: Truncate producer state when incrementing log start offset Key: KAFKA-9105 URL: https://issues.apache.org/jira/browse/KAFKA-9105 Project: Kafka Issue Type: Bug Affects Versions: 2.4.0 Reporter: Bob Barrett Assignee: Bob Barrett In github.com/apache/kafka/commit/c49775b, we removed the ProducerStateManager.truncateHead method as part of the change to retain producer state for longer. This removed some needed producer state management (such as removing unreplicated transactions) when incrementing the log start offset. We need to add this functionality back in. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9038) Allow creating partitions while partition reassignment is in progress
Bob Barrett created KAFKA-9038: -- Summary: Allow creating partitions while partition reassignment is in progress Key: KAFKA-9038 URL: https://issues.apache.org/jira/browse/KAFKA-9038 Project: Kafka Issue Type: Improvement Reporter: Bob Barrett If a user attempts to create partitions for a topic while a partition reassignment is in progress, we block the request even if the topic for which partitions are being created is not involved in the reassignment. This is an unnecessarily strict requirement; we should allow partition creation for topics that are not involved in reassignments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8805) Bump producer epoch following recoverable errors
Bob Barrett created KAFKA-8805: -- Summary: Bump producer epoch following recoverable errors Key: KAFKA-8805 URL: https://issues.apache.org/jira/browse/KAFKA-8805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 2.3.0 Reporter: Bob Barrett Assignee: Bob Barrett As part of KIP-360, the producer needs to call the new InitProducerId API after receiving UNKNOWN_PRODUCER_ID and INVALID_PRODUCER_MAPPING errors, which will allow the producers to bump their epoch and continue processing unless a new producer has already initialized a new producer ID. The broker change that this depends on is https://issues.apache.org/jira/browse/KAFKA-8710. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8710) InitProducerId changes for KIP-360
Bob Barrett created KAFKA-8710: -- Summary: InitProducerId changes for KIP-360 Key: KAFKA-8710 URL: https://issues.apache.org/jira/browse/KAFKA-8710 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.3.0 Reporter: Bob Barrett Assignee: Bob Barrett As part of KIP-360, InitProducerId needs to accept two additional parameters, the current producerId and the current producerEpoch, and it needs to allow producers to safely re-initialize a producer ID and continue processing as long as no other producer with the same transactional ID has started up. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8614) Rename the `responses` field of IncrementalAlterConfigsResponse to match AlterConfigs
Bob Barrett created KAFKA-8614: -- Summary: Rename the `responses` field of IncrementalAlterConfigsResponse to match AlterConfigs Key: KAFKA-8614 URL: https://issues.apache.org/jira/browse/KAFKA-8614 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.3.0 Reporter: Bob Barrett IncrementalAlterConfigsResponse and AlterConfigsResponse have an identical structure for per-resource error codes, but in AlterConfigsResponse it is named `Resources` while in IncrementalAlterConfigsResponse it is named `responses`. AlterConfigsResponse: {{{ "name": "Resources", "type": "[]AlterConfigsResourceResponse", "versions": "0+",}} {{"about": "The responses for each resource.", "fields": [}} {{{ "name": "ErrorCode", "type": "int16", "versions": "0+",}} {{"about": "The resource error code." },}} {{{ "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",}} {{"about": "The resource error message, or null if there was no error." },}} {{{ "name": "ResourceType", "type": "int8", "versions": "0+",}} {{"about": "The resource type." },}} {{{ "name": "ResourceName", "type": "string", "versions": "0+",}} {{"about": "The resource name." }}} IncrementalAlterConfigsResponse: {{ { "name": "responses", "type": "[]AlterConfigsResourceResult", "versions": "0+",}} {{ "about": "The responses for each resource.", "fields": [}} {{ { "name": "ErrorCode", "type": "int16", "versions": "0+",}} {{ "about": "The resource error code." },}} {{ { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",}} {{ "about": "The resource error message, or null if there was no error." },}} {{ { "name": "ResourceType", "type": "int8", "versions": "0+",}} {{ "about": "The resource type." },}} {{ { "name": "ResourceName", "type": "string", "versions": "0+",}} {{ "about": "The resource name." }}} {{ ]}}} We should change the field in IncrementalAlterConfigsResponse to be consistent with AlterConfigsResponse. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7607) NetworkClientUtils.sendAndReceive can take a long time to return during shutdown
Bob Barrett created KAFKA-7607: -- Summary: NetworkClientUtils.sendAndReceive can take a long time to return during shutdown Key: KAFKA-7607 URL: https://issues.apache.org/jira/browse/KAFKA-7607 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0 Reporter: Bob Barrett If a RequestSendThread is shut down while waiting on the underlying Selector to return from a select() call, the Selector will swallow the interrupt, wake up and return immediately. NetworkClientUtils.sendAndReceive will then potentially re-poll the client, forcing the thread shutdown to wait for the request to complete. We should check the thread interrupt status before re-polling the client to prevent this delay. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7401) Broker fails to start when recovering a segment from before the log start offset
Bob Barrett created KAFKA-7401: -- Summary: Broker fails to start when recovering a segment from before the log start offset Key: KAFKA-7401 URL: https://issues.apache.org/jira/browse/KAFKA-7401 Project: Kafka Issue Type: Bug Components: log Affects Versions: 2.0.0, 1.1.1, 1.1.0 Reporter: Bob Barrett If a segment needs to be recovered (for example, because of a missing index file or uncompleted swap operation) and its base offset is less than the log start offset, the broker will crash with the following error: {{Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)}} {{ java.lang.IllegalArgumentException: inconsistent range}} {{ at java.util.concurrent.ConcurrentSkipListMap$SubMap.(Unknown Source)}} {{ at java.util.concurrent.ConcurrentSkipListMap.subMap(Unknown Source)}} {{ at java.util.concurrent.ConcurrentSkipListMap.subMap(Unknown Source)}} {{ at kafka.log.Log$$anonfun$12.apply(Log.scala:1579)}} {{ at kafka.log.Log$$anonfun$12.apply(Log.scala:1578)}} {{ at scala.Option.map(Option.scala:146)}} {{ at kafka.log.Log.logSegments(Log.scala:1578)}} {{ at kafka.log.Log.kafka$log$Log$$recoverSegment(Log.scala:358)}} {{ at kafka.log.Log$$anonfun$completeSwapOperations$1.apply(Log.scala:389)}} {{ at kafka.log.Log$$anonfun$completeSwapOperations$1.apply(Log.scala:380)}} {{ at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)}} {{ at kafka.log.Log.completeSwapOperations(Log.scala:380)}} {{ at kafka.log.Log.loadSegments(Log.scala:408)}} {{ at kafka.log.Log.(Log.scala:216)}} {{ at kafka.log.Log$.apply(Log.scala:1765)}} {{ at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:260)}} {{ at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:340)}} {{ at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)}} {{ at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)}} {{ at java.util.concurrent.FutureTask.run(Unknown Source)}} {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)}} {{ at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)}} {{ at java.lang.Thread.run(Unknown Source)}} Since these segments are outside the log range, we should delete them, or at least not block broker startup because of them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7400) Compacted topic segments that precede the log start offset are not cleaned up
Bob Barrett created KAFKA-7400: -- Summary: Compacted topic segments that precede the log start offset are not cleaned up Key: KAFKA-7400 URL: https://issues.apache.org/jira/browse/KAFKA-7400 Project: Kafka Issue Type: Bug Components: log Affects Versions: 2.0.0, 1.1.1, 1.1.0 Reporter: Bob Barrett LogManager.cleanupLogs currently checks if a topic is compacted, and skips any deletion if it is. This means that if the log start offset increases, log segments that precede the start offset will never be deleted. The log cleanup logic should be improved to delete these segments even for compacted topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)