[GitHub] [kafka] lihaosky commented on pull request #11829: [2/N][emit final] add processor metadata to be committed with offset
lihaosky commented on pull request #11829: URL: https://github.com/apache/kafka/pull/11829#issuecomment-1084100040 Thanks @mjsax ! Created Jira: https://issues.apache.org/jira/browse/KAFKA-13785 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13785) Support emit final result for windowed aggregation
Hao Li created KAFKA-13785: -- Summary: Support emit final result for windowed aggregation Key: KAFKA-13785 URL: https://issues.apache.org/jira/browse/KAFKA-13785 Project: Kafka Issue Type: Improvement Reporter: Hao Li For KIP-825: https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13785) Support emit final result for windowed aggregation
[ https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-13785: -- Assignee: Hao Li > Support emit final result for windowed aggregation > -- > > Key: KAFKA-13785 > URL: https://issues.apache.org/jira/browse/KAFKA-13785 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > For KIP-825: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized
yufeiyan1220 commented on a change in pull request #11953: URL: https://github.com/apache/kafka/pull/11953#discussion_r839195366 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -743,6 +743,18 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def removeAllPartitions(): Map[TopicPartition, PartitionFetchState] = { +partitionMapLock.lockInterruptibly() Review comment: It might release the lock when trying to get all partitions and use the result to remove might lead to inconsistency(when this fetcher add some partition right between the two process called by `AbstractFetcherManager`. So I make the process into one new method. Another point is that we don't need to do some filtering or get the state of thread one by one, but just iterate and return the copy of the whole `partitionStates.partitionStateMap`. I think it seems more efficient in realization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11945: KAFKA-13769: Explicitly route FK join results to correct partitions
mjsax commented on a change in pull request #11945: URL: https://github.com/apache/kafka/pull/11945#discussion_r839183920 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java ## @@ -60,6 +63,42 @@ public void setIfUnset(final SerdeGetter getter) { } } +@Override +public void configure(final Map configs, final boolean isKey) { +this.upgradeFromV0 = upgradeFromV0(configs); +} + +private static boolean upgradeFromV0(final Map configs) { +final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); +if (!(upgradeFrom instanceof String)) { Review comment: As above. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -636,7 +684,10 @@ "Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" + -UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from the corresponding old version)."; Review comment: > When upgrading from 2.4 to a newer version it is not required to specify this config. Seems this needs an update? ## File path: docs/streams/upgrade-guide.html ## @@ -34,9 +34,9 @@ Upgrade Guide and API Changes -Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 2.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" -(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager -rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to +Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" Review comment: > upgrading from 3.3 Should be `3.2` -- the fix should go into 3.3, right? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java ## @@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) { return buf.array(); } +private byte[] serializeV1(final String topic, final SubscriptionResponseWrapper data) { +final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue()); +final int serializedDataLength = serializedData == null ? 0 : serializedData.length; +final long[] originalHash = data.getOriginalValueHash(); +final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES; +final int primaryPartitionLength = Integer.BYTES; +final int dataLength = 1 + hashLength + serializedDataLength + primaryPartitionLength; + +final ByteBuffer buf = ByteBuffer.allocate(dataLength); + +if (originalHash != null) { +buf.put(data.getVersion()); +} else { +buf.put((byte) (data.getVersion() | (byte) 0x80)); +} +buf.putInt(data.getPrimaryPartition()); Review comment: > This means that we can't just put data.getVersion() into the buffer, but have to hardcode it to 0 Yes, but this seems ok? Ie, we would have a `serializeV0Internal(final short version)` that is called as (something like): ``` public byte[] serializeV0() { return serializeV0Internal(0).getBytes(); } public byte[] serializeV1() { final ByteStream out = serializeV0Internal(0); // add v1 stuff to `out` return out.getBytes(); } ``` > Making this logic generic enough to handle v0, upgrade-from, and v1 will be cumbersome. Why? I understand that putting variable length at the end could be simpler. I guess I am just used to how it's usually done if Kafka. And personally, if I read code it's easier to have a mental model of the different versions in the bytes, if older versions are always a prefix of newer versions? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java ## @@ -62,6
[GitHub] [kafka] dengziming commented on pull request #11935: MINOR: Remove some unused code
dengziming commented on pull request #11935: URL: https://github.com/apache/kafka/pull/11935#issuecomment-1084076340 Hello @showuon , PTAL, Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11829: [2/N][emit final] add processor metadata to be committed with offset
mjsax commented on a change in pull request #11829: URL: https://github.com/apache/kafka/pull/11829#discussion_r839166179 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -537,6 +542,7 @@ public void closeDirty() { public void updateInputPartitions(final Set topicPartitions, final Map> allTopologyNodesToSourceTopics) { super.updateInputPartitions(topicPartitions, allTopologyNodesToSourceTopics); partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue); +processorContext.getProcessorMetadata().setNeedsCommit(true); Review comment: I agree it might be a rare case. If we think it's not worth it and the risk to lose the metadata is tiny (and it's complicated to force a commit right away) I am fine with not closing this race condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized
yufeiyan1220 commented on a change in pull request #11953: URL: https://github.com/apache/kafka/pull/11953#discussion_r839161947 ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri def resizeThreadPool(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = { + val allRemovedPartitionsMap = mutable.Map[TopicPartition, InitialFetchState]() fetcherThreadMap.forKeyValue { (id, thread) => -val partitionStates = removeFetcherForPartitions(thread.partitions) +val partitionStates = thread.removeAllPartitions() Review comment: I think unless there is a null as value in `partitionStates.partitionStateMap`, there is no NPE thrown. Method `partitionMapLock` make sure there is no other thread changing `partitionStates` in this process. The original version need to filter the null value because `partitionStates.stateValue` might return null when the partition is not included in `partitionStates.partitionStateMap`. I am not sure, may be I should add the filter logic as well or just make it by feeding all partitions to `removePartitions ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized
yufeiyan1220 commented on a change in pull request #11953: URL: https://github.com/apache/kafka/pull/11953#discussion_r839164671 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ## @@ -210,4 +218,115 @@ class AbstractFetcherManagerTest { verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds) verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds) } + + @Test + def testExpandThreadPool(): Unit = { +testResizeThreadPool(10, 50) + } + + @Test + def testShrinkThreadPool(): Unit = { +testResizeThreadPool(50, 10) + } + + private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: Int, brokerNum: Int = 6): Unit = { +val fetchingTopicPartitions = makeTopicPartition(10, 100) +val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed") +val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", currentFetcherSize) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { +new TestResizeFetcherThread(sourceBroker, failedPartitions) + } +} +try { + fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp => +val brokerId = getBrokerId(tp, brokerNum) +val brokerEndPoint = new BrokerEndPoint(brokerId, s"kafka-host-$brokerId", 9092) +tp -> InitialFetchState(None, brokerEndPoint, 0, 0) + }.toMap) + + // Mark some of these partitions failed within resizing scope + fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition) + // Mark failed partitions out of resizing scope + failedTopicPartitions.foreach(fetcherManager.addFailedPartition) + + fetcherManager.resizeThreadPool(newFetcherSize) + + val ownedPartitions = mutable.Set.empty[TopicPartition] + fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, fetcherThread) => +val fetcherId = brokerIdAndFetcherId.fetcherId +val brokerId = brokerIdAndFetcherId.brokerId + +fetcherThread.partitions.foreach { tp => + ownedPartitions += tp + assertEquals(fetcherManager.getFetcherId(tp), fetcherId) + assertEquals(getBrokerId(tp, brokerNum), brokerId) +} + } + // Verify that all partitions are owned by the fetcher threads. + assertEquals(fetchingTopicPartitions, ownedPartitions) + + val failedPartitionsAfterResize = fetcherManager.failedPartitions.failedPartitions() + // Verify that failed partitions within resizing scope are removed, otherwise retained Review comment: Some failed partitions are marked as failed and removed in `markPartitionFailed` and I think the resizing process should not clear them, but to retain them as failed after resizing finished. This is to simulate the state of failed partitions before and after resizing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized
yufeiyan1220 commented on a change in pull request #11953: URL: https://github.com/apache/kafka/pull/11953#discussion_r839161947 ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri def resizeThreadPool(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = { + val allRemovedPartitionsMap = mutable.Map[TopicPartition, InitialFetchState]() fetcherThreadMap.forKeyValue { (id, thread) => -val partitionStates = removeFetcherForPartitions(thread.partitions) +val partitionStates = thread.removeAllPartitions() Review comment: I think unless there is a null as value in `partitionStates.partitionStateMap`, there is no NPE thrown. I think `partitionMapLock` make sure there is no other thread changing `partitionStates` in this process. The original version need to filter the null value because `partitionStates.stateValue` might return null when the partition is not included in `partitionStates.partitionStateMap`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized
showuon commented on a change in pull request #11953: URL: https://github.com/apache/kafka/pull/11953#discussion_r839140933 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -743,6 +743,18 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def removeAllPartitions(): Map[TopicPartition, PartitionFetchState] = { +partitionMapLock.lockInterruptibly() Review comment: Maybe we can just feed all partitions into `removePartitions` method? ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri def resizeThreadPool(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = { + val allRemovedPartitionsMap = mutable.Map[TopicPartition, InitialFetchState]() fetcherThreadMap.forKeyValue { (id, thread) => -val partitionStates = removeFetcherForPartitions(thread.partitions) +val partitionStates = thread.removeAllPartitions() Review comment: Originally, we call `removeFetcherForPartitions`, where we will filter out the `InitialFetchState == null` partitions [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L742). I'm not sure if this is something we should care about, but I think we'd better to keep the filter logic to avoid the NPE thrown. ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ## @@ -210,4 +218,115 @@ class AbstractFetcherManagerTest { verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds) verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds) } + + @Test + def testExpandThreadPool(): Unit = { +testResizeThreadPool(10, 50) + } + + @Test + def testShrinkThreadPool(): Unit = { +testResizeThreadPool(50, 10) + } + + private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: Int, brokerNum: Int = 6): Unit = { +val fetchingTopicPartitions = makeTopicPartition(10, 100) +val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed") +val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", currentFetcherSize) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { +new TestResizeFetcherThread(sourceBroker, failedPartitions) + } +} +try { + fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp => +val brokerId = getBrokerId(tp, brokerNum) +val brokerEndPoint = new BrokerEndPoint(brokerId, s"kafka-host-$brokerId", 9092) +tp -> InitialFetchState(None, brokerEndPoint, 0, 0) + }.toMap) + + // Mark some of these partitions failed within resizing scope + fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition) + // Mark failed partitions out of resizing scope + failedTopicPartitions.foreach(fetcherManager.addFailedPartition) + + fetcherManager.resizeThreadPool(newFetcherSize) + + val ownedPartitions = mutable.Set.empty[TopicPartition] + fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, fetcherThread) => +val fetcherId = brokerIdAndFetcherId.fetcherId +val brokerId = brokerIdAndFetcherId.brokerId + +fetcherThread.partitions.foreach { tp => + ownedPartitions += tp + assertEquals(fetcherManager.getFetcherId(tp), fetcherId) + assertEquals(getBrokerId(tp, brokerNum), brokerId) +} + } + // Verify that all partitions are owned by the fetcher threads. + assertEquals(fetchingTopicPartitions, ownedPartitions) + + val failedPartitionsAfterResize = fetcherManager.failedPartitions.failedPartitions() + // Verify that failed partitions within resizing scope are removed, otherwise retained Review comment: Could you explain what does `otherwise retained` mean? I can understand the failed partitions within resizing scope should be removed since we've verified it, but don't know the latter one. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11919: MINOR: Unify the log output of JaasContext.defaultContext
RivenSun2 commented on pull request #11919: URL: https://github.com/apache/kafka/pull/11919#issuecomment-1084017410 Hi @dajac @ijuma please help to review the PR . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11911: KAFKA-13463: Make pause behavior consistent between cooperative and eager protocols
RivenSun2 commented on pull request #11911: URL: https://github.com/apache/kafka/pull/11911#issuecomment-1084016980 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11947: MINOR: Improve the description of principal under different mechanisms of sasl
RivenSun2 commented on pull request #11947: URL: https://github.com/apache/kafka/pull/11947#issuecomment-1084016362 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up
[ https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515035#comment-17515035 ] RivenSun commented on KAFKA-13771: -- Hi [~ijuma] [~rsivaram] Could you give some suggestions for this issue? Thanks. > Support to explicitly delete delegationTokens that have expired but have not > been automatically cleaned up > -- > > Key: KAFKA-13771 > URL: https://issues.apache.org/jira/browse/KAFKA-13771 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: RivenSun >Priority: Major > > Quoting the official documentation > {quote} > Tokens can also be cancelled explicitly. If a token is not renewed by the > token’s expiration time or if token is beyond the max life time, it will be > deleted from all broker caches as well as from zookeeper. > {quote} > 1. The first point above means that after the `AdminClient` initiates the > EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() > method on the KafkaServer side, if the user passes in expireLifeTimeMs less > than 0, KafaServer will delete the corresponding delegationToken directly. > 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, > which is responsible for regularly cleaning up expired tokens. The execution > interval is `delegation.token.expiry.check.interval.ms`, and the default > value is one hour. > But carefully analyze the code logic in DelegationTokenManager.expireToken(), > *now Kafka does not support users to delete an expired delegationToken that > he no longer uses/renew. If the user wants to do this, they will receive a > DelegationTokenExpiredException.* > In the worst case, an expired delegationToken may still can be used normally > within {*}an hour{*}, even if this configuration > (delegation.token.expiry.check.interval.ms) broker can shorten the > configuration as much as possible. > The solution is very simple, simply adjust the `if` order of > DelegationTokenManager.expireToken(). > {code:java} > if (!allowedToRenew(principal, tokenInfo)) { > expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1) > } else if (expireLifeTimeMs < 0) { //expire immediately > removeToken(tokenInfo.tokenId) > info(s"Token expired for token: ${tokenInfo.tokenId} for owner: > ${tokenInfo.owner}") > expireResponseCallback(Errors.NONE, now) > } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) { > expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1) > } else { > //set expiry time stamp > .. > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13757) Improve the annotations of all related methods of DelegationToken in the Admin class
[ https://issues.apache.org/jira/browse/KAFKA-13757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515034#comment-17515034 ] RivenSun commented on KAFKA-13757: -- Hi [~dajac] [~rsivaram] Could you give some suggestions for this issue? Thanks. > Improve the annotations of all related methods of DelegationToken in the > Admin class > > > Key: KAFKA-13757 > URL: https://issues.apache.org/jira/browse/KAFKA-13757 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: RivenSun >Priority: Major > > DelegationToken is a great and lightweight feature, but when users actually > use it, they get confused. > From the existing official documents/comments on methods/comments on method > parameters, the user cannot know what is the specific processing logic of the > server and what is the meaning of the returned fields after he calls the > XXXDelegationToken(...) method. > After reading the source code, I briefly sorted out the processing logic of > the XXXDelegationToken(...) method on the server side. > 1. createDelegationToken: > {code:java} > // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token > authenticated channels, > // throw UnsupportedByAuthenticationException > // 2. if the delegation token feature is disabled, throw > DelegationTokenDisabledException > // 3. if the renewers principal type is not KafkaPrincipal.USER_TYPE, throw > InvalidPrincipalTypeException > // 4. if the request was not completed in within the given timeoutMs(), throw > TimeoutException > //processing logic: > // maxLifeTime = `maxLifeTimeMs` <= 0 ? > brokerConfig.delegationTokenMaxLifeMs : Math.min(`maxLifeTimeMs`, > brokerConfig.delegationTokenMaxLifeMs) > // maxLifeTimestamp = currentTimeMillis + maxLifeTime > // expiryTimestamp = Math.min(maxLifeTimestamp, currentTimeMillis + > brokerConfig.delegationTokenExpiryTimeMs) > // update tokenInfo and return createTokenResult {code} > 2. renewDelegationToken > {code:java} > // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token > authenticated channels, > // throw UnsupportedByAuthenticationException > // 2. if the delegation token feature is disabled, throw > DelegationTokenDisabledException > // 3. if the authenticated user is not owner/renewer of the token, throw > DelegationTokenOwnerMismatchException > // 4. if the delegation token is expired, throw > DelegationTokenExpiredException > // 5. if the delegation token is not found on server, throw > DelegationTokenNotFoundException > // 6. if the request was not completed in within the given timeoutMs(), throw > TimeoutException > //processing logic: > //renewLifeTime = `renewTimePeriodMs` < 0 ? > brokerConfig.delegationTokenExpiryTimeMs : `renewTimePeriodMs` > //renewTimestamp = currentTimeMillis + renewLifeTime > //expiryTimestamp = Math.min(tokenInfo.maxTimestamp, renewTimestamp) > //update tokenInfo.expiryTimestamp > //return expiryTimestamp {code} > 3. expireDelegationToken > {code:java} > // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token > authenticated channels, > // throw UnsupportedByAuthenticationException > // 2. if the delegation token feature is disabled, throw > DelegationTokenDisabledException > // 3. if the authenticated user is not owner/renewer of the token, throw > DelegationTokenOwnerMismatchException > // 4. if the delegation token is expired, throw > DelegationTokenExpiredException > // 5. if the delegation token is not found on server, throw > DelegationTokenNotFoundException > // 6. if the request was not completed in within the given timeoutMs(), throw > TimeoutException > //processing logic: > //if `expiryTimePeriodMs` < 0, delete tokenInfo immediately, return > currentTimeMillis. > //otherwise update tokenInfo expiryTimestamp: > // expiryTimestamp = Math.min(tokenInfo.maxTimestamp, > currentTimeMillis + `expiryTimePeriodMs`) > // update tokenInfo.expiryTimestamp > // return expiryTimestamp > // > //Note: Tokens can be cancelled explicitly. If a token is not renewed by > the token’s expiration time or if token is > //beyond the max life time, it will also be deleted from all broker > caches as well as from zookeeper. {code} > 4. describeDelegationToken > {code:java} > // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token > authenticated channels, > // throw UnsupportedByAuthenticationException > // 2. if the delegation token feature is disabled, throw > DelegationTokenDisabledException > // 3. if the request was not completed in within the given timeoutMs(), throw > TimeoutException > //processing logic: > //if `owners` is EmptyList(note:
[jira] [Commented] (KAFKA-13751) On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
[ https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515032#comment-17515032 ] RivenSun commented on KAFKA-13751: -- Hi [~dajac] [~ijuma] Could you give some suggestions for this issue? Thanks. > On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms > > > Key: KAFKA-13751 > URL: https://issues.apache.org/jira/browse/KAFKA-13751 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 3.0.1 >Reporter: RivenSun >Priority: Critical > > h1. Phenomenon: > SASL/OAUTHBEARER, whether implemented by default or customized by the user, > is not compatible with other SASL mechanisms. > h3. > case1: > kafka_server_jaas_oauth.conf > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="admin" > user_admin="admin" > user_alice="alice"; >org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required; >org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin" > password="admin_scram"; > }; {code} > server.properties > {code:java} > advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669 > > listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code} > Error when starting kafka: > server.log > {code:java} > [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Must supply exactly 1 non-null JAAS mechanism configuration (size was 3) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172) > at kafka.network.Processor.(SocketServer.scala:724) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:367) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211) > at kafka.network.SocketServer.startup(SocketServer.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:266) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null > JAAS mechanism configuration (size was 3) > at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139) > ... 17 more > [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down > (kafka.server.KafkaServer) > [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket > server request processors (kafka.network.SocketServer) {code} > The default implementation class of oauthbearer's > `sasl.server.callback.handler.class` is > OAuthBearerUnsecuredValidatorCallbackHandler. > In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, > the jaasConfigEntries parameter is verified. > What I want to say is that {*}the verification logic here is completely > reasonable{*}, but the jaasConfigEntries passed in from the upper layer > should not contain the AppConfigurationEntry of other loginModules. There are > several other codes for the check of the same keyword *"Must supply exactly 1 > non-null JAAS mechanism configuration".* > Rootcause elaborates later. > By the way, at present, KafkaServer allows {*}the same LoginModule to be > configured multiple times in kafkaJaasConfigFile{*}, which will also lead to > the phenomenon of case1. > kafka_server_jaas_oauth.conf eg: >
[GitHub] [kafka] showuon commented on pull request #11959: MINOR: log warning when topology override for cache size is non-zero
showuon commented on pull request #11959: URL: https://github.com/apache/kafka/pull/11959#issuecomment-1084012434 Thanks for fixing the tests, @ableegoldman ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets
showuon edited a comment on pull request #11936: URL: https://github.com/apache/kafka/pull/11936#issuecomment-1084010770 Cherry-pick back to 3.2. Thanks. cc @cadonna -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets
showuon commented on pull request #11936: URL: https://github.com/apache/kafka/pull/11936#issuecomment-1084010770 Cherry-pick back to 3.2. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets
showuon merged pull request #11936: URL: https://github.com/apache/kafka/pull/11936 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets
showuon commented on pull request #11936: URL: https://github.com/apache/kafka/pull/11936#issuecomment-1084009325 All tests passed. Merging it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11968: add toString method
showuon commented on pull request #11968: URL: https://github.com/apache/kafka/pull/11968#issuecomment-1084007839 Thanks for @dajac 's reminder. Yes, this was already fixed in the latest client. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yun-yun commented on pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking
yun-yun commented on pull request #11963: URL: https://github.com/apache/kafka/pull/11963#issuecomment-1083992053 > @yun-yun , thanks for the contribution! @showuon It's my pleasure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11972: MINOR: Fix doc variable typos in `TopicConfig`
showuon commented on pull request #11972: URL: https://github.com/apache/kafka/pull/11972#issuecomment-1083981413 @fxbing , thanks for the contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11972: MINOR: Fix doc variable typos in `TopicConfig`
showuon merged pull request #11972: URL: https://github.com/apache/kafka/pull/11972 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13777) Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking
[ https://issues.apache.org/jira/browse/KAFKA-13777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-13777. --- Resolution: Fixed > Fix FetchResponse#responseData: Assignment of lazy-initialized members should > be the last step with double-checked locking > -- > > Key: KAFKA-13777 > URL: https://issues.apache.org/jira/browse/KAFKA-13777 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.1 >Reporter: YunKui Lu >Priority: Minor > Fix For: 3.3.0 > > > Assignment of lazy-initialized members should be the last step with > double-checked locking. > now: > > {code:java} > public LinkedHashMap > responseData(Map topicNames, short version) { > if (responseData == null) { > synchronized (this) { > if (responseData == null) { > responseData = new LinkedHashMap<>(); > data.responses().forEach(topicResponse -> { > String name; > if (version < 13) { > name = topicResponse.topic(); > } else { > name = topicNames.get(topicResponse.topicId()); > } > if (name != null) { > topicResponse.partitions().forEach(partition -> > responseData.put(new TopicPartition(name, > partition.partitionIndex()), partition)); > } > }); > } > } > } > return responseData; > } {code} > maybe should: > > > {code:java} > public LinkedHashMap > responseData(Map topicNames, short version) { > if (responseData == null) { > synchronized (this) { > if (responseData == null) { > // *** change 1 *** > final LinkedHashMap FetchResponseData.PartitionData> responseDataTmp = new LinkedHashMap<>(); > data.responses().forEach(topicResponse -> { > String name; > if (version < 13) { > name = topicResponse.topic(); > } else { > name = topicNames.get(topicResponse.topicId()); > } > if (name != null) { > topicResponse.partitions().forEach(partition -> > // *** change 2 *** > responseDataTmp.put(new > TopicPartition(name, partition.partitionIndex()), partition)); > } > }); > // *** change 3 *** > responseData = responseDataTmp; > } > } > } > return responseData; > } {code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13777) Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking
[ https://issues.apache.org/jira/browse/KAFKA-13777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13777: -- Fix Version/s: 3.3.0 > Fix FetchResponse#responseData: Assignment of lazy-initialized members should > be the last step with double-checked locking > -- > > Key: KAFKA-13777 > URL: https://issues.apache.org/jira/browse/KAFKA-13777 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.1 >Reporter: YunKui Lu >Priority: Minor > Fix For: 3.3.0 > > > Assignment of lazy-initialized members should be the last step with > double-checked locking. > now: > > {code:java} > public LinkedHashMap > responseData(Map topicNames, short version) { > if (responseData == null) { > synchronized (this) { > if (responseData == null) { > responseData = new LinkedHashMap<>(); > data.responses().forEach(topicResponse -> { > String name; > if (version < 13) { > name = topicResponse.topic(); > } else { > name = topicNames.get(topicResponse.topicId()); > } > if (name != null) { > topicResponse.partitions().forEach(partition -> > responseData.put(new TopicPartition(name, > partition.partitionIndex()), partition)); > } > }); > } > } > } > return responseData; > } {code} > maybe should: > > > {code:java} > public LinkedHashMap > responseData(Map topicNames, short version) { > if (responseData == null) { > synchronized (this) { > if (responseData == null) { > // *** change 1 *** > final LinkedHashMap FetchResponseData.PartitionData> responseDataTmp = new LinkedHashMap<>(); > data.responses().forEach(topicResponse -> { > String name; > if (version < 13) { > name = topicResponse.topic(); > } else { > name = topicNames.get(topicResponse.topicId()); > } > if (name != null) { > topicResponse.partitions().forEach(partition -> > // *** change 2 *** > responseDataTmp.put(new > TopicPartition(name, partition.partitionIndex()), partition)); > } > }); > // *** change 3 *** > responseData = responseDataTmp; > } > } > } > return responseData; > } {code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon merged pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking
showuon merged pull request #11963: URL: https://github.com/apache/kafka/pull/11963 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking
showuon commented on pull request #11963: URL: https://github.com/apache/kafka/pull/11963#issuecomment-1083965192 @yun-yun , thanks for the contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking
showuon commented on pull request #11963: URL: https://github.com/apache/kafka/pull/11963#issuecomment-1083964754 Failed tests are unrelated and also failed in trunk build. ``` Build / PowerPC / org.apache.kafka.clients.consumer.KafkaConsumerTest.testReturnRecordsDuringRebalance() Build / JDK 11 and Scala 2.13 / kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bozhao12 commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection
bozhao12 commented on pull request #11965: URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083910260 @dajac @ijuma One new unit test updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest
jeffkbkim commented on a change in pull request #11971: URL: https://github.com/apache/kafka/pull/11971#discussion_r839055468 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2819,6 +2851,39 @@ public void testEnforceRebalanceTriggersRebalanceOnNextPoll() { assertEquals(countingRebalanceListener.revokedCount, 1); } +@Test +public void testEnforceRebalanceReason() { +Time time = new MockTime(1L); +ConsumerMetadata metadata = createMetadata(subscription); +MockClient client = new MockClient(time, metadata); +KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); +MockRebalanceListener countingRebalanceListener = new MockRebalanceListener(); +initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); + +consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener); +Node node = metadata.fetch().nodes().get(0); +prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); + +// a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group +consumer.poll(Duration.ZERO); +consumer.poll(Duration.ZERO); Review comment: the test passes without the second poll. the first poll finishes the sync ``` INFO Successfully synced group in generation ``` before the second poll is triggered. The second poll notifies the assignor and gets committed offsets which i don't think is necessary in this test ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) { List membersToRemove = new ArrayList<>(); for (final MemberDescription member : members) { +MemberIdentity memberIdentity = new MemberIdentity() +.setReason(reason); + if (member.groupInstanceId().isPresent()) { -membersToRemove.add(new MemberIdentity().setGroupInstanceId(member.groupInstanceId().get())); + memberIdentity.setGroupInstanceId(member.groupInstanceId().get()); } else { -membersToRemove.add(new MemberIdentity().setMemberId(member.consumerId())); +memberIdentity.setMemberId(member.consumerId()); } + +membersToRemove.add(memberIdentity); } return membersToRemove; } @Override public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) { +String reason = options.reason() == null || options.reason().isEmpty() ? +DEFAULT_LEAVE_GROUP_REASON : options.reason(); + List members; if (options.removeAll()) { -members = getMembersFromGroup(groupId); +members = getMembersFromGroup(groupId, reason); } else { -members = options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList()); +members = options.members().stream() +.map(m -> m.toMemberIdentity().setReason(reason)) +.collect(Collectors.toList()); Review comment: should this have been done as part of KIP-800? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) { List membersToRemove = new ArrayList<>(); for (final MemberDescription member : members) { +MemberIdentity memberIdentity = new MemberIdentity() +.setReason(reason); Review comment: nit: does `.setReason()` have to be in its own line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11959: MINOR: log warning when topology override for cache size is non-zero
ableegoldman commented on pull request #11959: URL: https://github.com/apache/kafka/pull/11959#issuecomment-1083757310 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11959: MINOR: log warning when topology override for cache size is non-zero
ableegoldman merged pull request #11959: URL: https://github.com/apache/kafka/pull/11959 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default
kkonstantine commented on pull request #11908: URL: https://github.com/apache/kafka/pull/11908#issuecomment-1083753832 Thanks both. Merged to `trunk` and cherry-picked to `3.2, 3.1, 3.0`. cc @cadonna @tombentley -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13152. Resolution: Fixed > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > Fix For: 3.3.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ableegoldman commented on pull request #11959: MINOR: log warning when topology override for cache size is non-zero
ableegoldman commented on pull request #11959: URL: https://github.com/apache/kafka/pull/11959#issuecomment-1083746981 All test failures are in Connect, so unrelated. Going to merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11959: MINOR: log warning when topology override for cache size is non-zero
ableegoldman commented on a change in pull request #11959: URL: https://github.com/apache/kafka/pull/11959#discussion_r839039646 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -466,64 +468,9 @@ public void close() { waitForTransitionFromRebalancingToRunning(); for (final String log : appender.getMessages()) { -// after we replace the thread there should be two remaining threads with 5 bytes each -if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3/178956970 per thread.")) { Review comment: No actually I meant that the comment was correct -- the test was just verifying incorrect results (after the thread replacement there should be 2 threads with 5MB of cache, as it says). But no worries -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
cmccabe commented on a change in pull request #11941: URL: https://github.com/apache/kafka/pull/11941#discussion_r839038060 ## File path: metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public class ConfigSynonym { +private static final Logger log = LoggerFactory.getLogger(ConfigSynonym.class); + +public static final Function IDENTITY = a -> a; + +public static final Function HOURS_TO_MILLISECONDS = input -> { +int hours = valueToInt(input, 0, "hoursToMilliseconds"); +return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.HOURS)); +}; + +public static final Function MINUTES_TO_MILLISECONDS = input -> { +int hours = valueToInt(input, 0, "minutesToMilliseconds"); +return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.MINUTES)); +}; + +private static int valueToInt(String input, int defaultValue, String what) { +String trimmedInput = input.trim(); +if (trimmedInput.isEmpty()) { +return defaultValue; +} +try { +return Integer.parseInt(trimmedInput); +} catch (Exception e) { +log.error("{} failed: unable to parse '{}' as an integer.", what, trimmedInput, e); +return defaultValue; +} +} + +private final String name; +private final Function converter; + +public ConfigSynonym(String name, Function converter) { +this.name = name; +this.converter = converter; +} + +public ConfigSynonym(String name) { +this(name, IDENTITY); +} + +public String name() { +return name; +} + +public Function converter() { +return converter; +} + +public String convert(String input) { Review comment: no, let's remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
cmccabe commented on a change in pull request #11941: URL: https://github.com/apache/kafka/pull/11941#discussion_r839036960 ## File path: metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public class ConfigSynonym { +private static final Logger log = LoggerFactory.getLogger(ConfigSynonym.class); + +public static final Function IDENTITY = a -> a; + +public static final Function HOURS_TO_MILLISECONDS = input -> { +int hours = valueToInt(input, 0, "hoursToMilliseconds"); +return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.HOURS)); +}; + +public static final Function MINUTES_TO_MILLISECONDS = input -> { +int hours = valueToInt(input, 0, "minutesToMilliseconds"); +return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.MINUTES)); +}; + +private static int valueToInt(String input, int defaultValue, String what) { +String trimmedInput = input.trim(); Review comment: I don't *think* it can, since these configs are of `ConfigDef.Type` `INT`, `LONG`, etc. and config parsing would have rejected the `null`... uh, I think. In any case, I just added a case for that so that we return the default value, just to be safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
cmccabe commented on a change in pull request #11941: URL: https://github.com/apache/kafka/pull/11941#discussion_r839036960 ## File path: metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public class ConfigSynonym { +private static final Logger log = LoggerFactory.getLogger(ConfigSynonym.class); + +public static final Function IDENTITY = a -> a; + +public static final Function HOURS_TO_MILLISECONDS = input -> { +int hours = valueToInt(input, 0, "hoursToMilliseconds"); +return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.HOURS)); +}; + +public static final Function MINUTES_TO_MILLISECONDS = input -> { +int hours = valueToInt(input, 0, "minutesToMilliseconds"); +return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.MINUTES)); +}; + +private static int valueToInt(String input, int defaultValue, String what) { +String trimmedInput = input.trim(); Review comment: I added a case for that so that we return the default value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
cmccabe commented on a change in pull request #11941: URL: https://github.com/apache/kafka/pull/11941#discussion_r839036276 ## File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java ## @@ -51,27 +52,100 @@ public class ConfigurationControlManager { +public static final ConfigResource DEFAULT_NODE = new ConfigResource(Type.BROKER, ""); + private final Logger log; private final SnapshotRegistry snapshotRegistry; private final KafkaConfigSchema configSchema; private final Consumer existenceChecker; private final Optional alterConfigPolicy; private final ConfigurationValidator validator; private final TimelineHashMap> configData; +private final Map staticConfig; +private final ConfigResource currentNode; Review comment: I think "this" is more confusing than "node" :) I changed it to `currentController`, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
cmccabe commented on a change in pull request #11941: URL: https://github.com/apache/kafka/pull/11941#discussion_r839035589 ## File path: core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala ## @@ -189,6 +192,15 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg override def modifyConfigs(configs: Seq[Properties]): Unit = { super.modifyConfigs(configs) +// For testCreateTopicsReturnsConfigs, set some static broker configurations so that we can +// verify that they show up in the "configs" output of CreateTopics. Review comment: Yes, this is kind of a problem with the old test harness. I agree that the non-locality could be confusing here. However, I think it would be a lot of repeated boilerplate code to create a new test file just for this one test case. As a compromise, I added some JavaDoc to `testCreateTopicsReturnsConfigs` so that anyone modifying that test can see where the configurations are being set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
cmccabe commented on a change in pull request #11941: URL: https://github.com/apache/kafka/pull/11941#discussion_r839033673 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -462,7 +475,9 @@ public void replay(RemoveTopicRecord record) { private ApiError createTopic(CreatableTopic topic, List records, - Map successes) { + Map successes, + boolean includeConfigs) { Review comment: Changed to `authorizedToReturnConfigs` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink
[ https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514971#comment-17514971 ] Chris Egerton commented on KAFKA-13601: --- [~dasarianil] does the above make sense to you? If so, do you still want to pursue synchronous offset commits for sink tasks? > Add option to support sync offset commit in Kafka Connect Sink > -- > > Key: KAFKA-13601 > URL: https://issues.apache.org/jira/browse/KAFKA-13601 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Anil Dasari >Priority: Major > > Exactly once in s3 connector with scheduled rotation and field partitioner > can be achieved with consumer offset sync' commit after message batch flushed > to sink successfully > Currently, WorkerSinkTask committing the consumer offsets asynchronously and > at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354] > > Add config to allow user to select synchronous commit over > WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] C0urante commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector
C0urante commented on pull request #10367: URL: https://github.com/apache/kafka/pull/10367#issuecomment-1083666779 @showuon FWIW, I've got a refactoring PR up for the testing logic that we might take a look at in the meantime: https://github.com/apache/kafka/pull/11974 Hopefully this should make writing and reviewing tests for changes like in this PR easier in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
C0urante commented on pull request #11974: URL: https://github.com/apache/kafka/pull/11974#issuecomment-1083666008 There's still room for improvement with the incremental rebalancing testing logic, but many of the remaining changes would involve modifications to the `IncrementalCooperativeAssignor` class as well. In order to reduce PR size and keep things focused, I've decided to leave that to a follow-up pull request, but since it would accomplish basically the same goal, I'll attach it to KAFKA-13763 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
C0urante opened a new pull request #11974: URL: https://github.com/apache/kafka/pull/11974 [Jira](https://issues.apache.org/jira/browse/KAFKA-13763) This is strictly a testing refactor. No functional changes are made; this can be easily verified by confirming that the only affected file is the `IncrementalCooperativeAssignorTest.java` test suite. These changes were initially discussed during review of https://github.com/apache/kafka/pull/10367, which partially focused on improving readability in the unit tests for incremental rebalancing in Connect. The goals here include: 1. Simplify the logic that has to be manually specified on a per-test-case basis for simulating a rebalance (accomplished by extracting common logic into reusable utility methods such as `performRebalance` and `addNewEmptyWorkers`) 2. Reduce the cognitive burden for following testing logic by removing unnecessary fields (like `expectedMemberConfigs` and `assignments`) and assertions (like the redundant checks for leader and leader URL) 3. Add powerful, granular, and reusable utility methods that can provide stronger guarantees about the state of a cluster across successive rebalances without forcing people to track this state in their heads (accomplished by replacing the existing `assertAssignments` method with `assertWorkers`, `assertEmptyAssignment`, `assertConnectorAllocations`, and `assertTaskAllocations`, and by adding the new `assertBalancedAllocation` and `assertCompleteAllocation` methods) 4. Refactor common logic for testing utilities to be more concise and reduce the number of Java 8 streams statements that have to be understood in order to read through a test case 5. Fix a bug in the assertion logic for checking for duplicates currently present [here](https://github.com/apache/kafka/blob/b2cb6caa1e9267c720c00fa367a277ee8509baea/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1447-L1451) and [here](https://github.com/apache/kafka/blob/b2cb6caa1e9267c720c00fa367a277ee8509baea/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1456-L1460) ([`List::removeAll`](https://docs.oracle.com/javase/8/docs/api/java/util/List.html#removeAll-java.util.Collection-) removes _all_ occurrences of any element contained in the collection passed to the method) 6. Remove an incorrect assertion in the `assertNoReassignments` (now renamed to `assertNoRedundantAssignments`) method that there should be no duplicated connectors or tasks in the assignments reported by each worker to the leader during rebalance (this is unnecessary and even contradicts logic used for testing in cases like [this](https://github.com/apache/kafka/blob/b2cb6caa1e9267c720c00fa367a277ee8509baea/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1112-L1125) where we intentionally simulate a worker with a duplicated set of connectors and/or tasks rejoining a cluster; the only reason this bug wasn't surfaced sooner is because the bug mentioned in the prior point covers it) Once merged, this should allow for cleaner, faster test writing when adding new cases for incremental rebalancing, such as with https://github.com/apache/kafka/pull/10367. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13763) Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
[ https://issues.apache.org/jira/browse/KAFKA-13763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-13763: -- Summary: Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor (was: Improve unit testing coverage for IncrementalCooperativeAssignor) > Improve unit testing coverage and flexibility for > IncrementalCooperativeAssignor > > > Key: KAFKA-13763 > URL: https://issues.apache.org/jira/browse/KAFKA-13763 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > > The > [tests|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java] > for the {{IncrementalCooperativeAssignor}} class provide a moderate level of > coverage and cover some non-trivial cases, but there are some areas for > improvement that will allow us to iterate on the assignment logic for Kafka > Connect faster and with greater confidence. > These improvements include: > * Adding reusable utility methods to assert that a cluster's assignment is > *balanced* (the difference in the number of connectors and tasks assigned to > any two workers is at most one) and *complete* (all connectors and tasks are > assigned to a worker) > * Removing the existing > [assertAssignment|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1373-L1405] > methods and replacing them with a more fine-grained alternative that allows > for more granular assertions about the number of tasks/connectors > assigned/revoked from each worker during a round of rebalance, instead of the > total for the entire cluster > * Adding a reusable utility method to assert the current distribution of > connectors and tasks across the cluster > * Decomposing large portions of repeated code for simulating a round of > rebalancing into a reusable utility method > * Renaming variable names to improve accuracy/readability (the > {{expectedMemberConfigs}} field, for example, is pretty poorly named) > But other improvements may be added in a pull request that addresses the > above as they come up. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13784) DescribeQuorum should return the current leader if the handling node is not the current leader
Jose Armando Garcia Sancio created KAFKA-13784: -- Summary: DescribeQuorum should return the current leader if the handling node is not the current leader Key: KAFKA-13784 URL: https://issues.apache.org/jira/browse/KAFKA-13784 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.2.0 Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio For clients calling DescribeQuorum leader it is not possible for them to discover the current leader. If the request is sent to a node that is not the leader is simply replies with INVALID_REQUEST. KIP-595 mentions that it should instead reply with the current leader. > f the response indicates that the intended node is not the current leader, >then check the response to see if the {{LeaderId}} has been set. If so, then >attempt to retry the request with the new leader. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji merged pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common
hachikuji merged pull request #11970: URL: https://github.com/apache/kafka/pull/11970 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13748) Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default
[ https://issues.apache.org/jira/browse/KAFKA-13748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-13748. Resolution: Fixed cc [~cadonna] [~tombentley] re: inclusion to the upcoming releases. > Do not include file stream connectors in Connect's CLASSPATH and plugin.path > by default > --- > > Key: KAFKA-13748 > URL: https://issues.apache.org/jira/browse/KAFKA-13748 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 3.2.0, 3.1.1, 3.0.2 > > > File stream connectors have been included with Kafka Connect distributions > from the very beginning. These simple connectors were included to show case > connector implementation but were never meant to be used in production and > have been only available for the straightforward demonstration of Connect's > capabilities through our quick start guides. > > Given that these connectors are not production ready and yet they offer > access to the local filesystem, with this ticket I propose to remove them > from our deployments by default by excluding these connectors from the > {{CLASSPATH}} or the default {{{}plugin.path{}}}. > > The impact will be minimal. Quick start guides will require a single > additional step of editing the {{plugin.path}} to include the single package > that includes these connectors. Production deployments will remain unaffected > because these are not production grade connectors. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13759) Disable producer idempotence by default in producers instantiated by Connect
[ https://issues.apache.org/jira/browse/KAFKA-13759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17511516#comment-17511516 ] Konstantine Karantasis edited comment on KAFKA-13759 at 3/30/22, 8:38 PM: -- This issue has been now been merged on the 3.2 and 3.1 branches to avoid a breaking change when Connect contacts older brokers and idempotence is enabled in the producer by default. [~cadonna] [~tombentley] fyi. Hopefully this fix makes to the upcoming releases but please let me know if the targeted versions need to be adjusted. was (Author: kkonstantine): This issue has been now been merged on the 3.2 and 3.1 branches to avoid a breaking change when Connect. [~cadonna] [~tombentley] fyi. Hopefully this fix makes to the upcoming releases but please let me know if the targeted versions need to be adjusted. > Disable producer idempotence by default in producers instantiated by Connect > > > Key: KAFKA-13759 > URL: https://issues.apache.org/jira/browse/KAFKA-13759 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 3.2.0, 3.1.1, 3.0.2 > > > https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently > referring to KIP-318. Before that in AK 3.0 idempotence was enabled by > default across Kafka producers. > However, some compatibility implications were missed in both cases. > If idempotence is enabled by default Connect won't be able to communicate via > its producers with Kafka brokers older than version 0.11. Perhaps more > importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL > is required to be granted to the principal of the Connect worker. > Given the above caveats, this ticket proposes to explicitly disable producer > idempotence in Connect by default. This feature, as it happens today, can be > enabled by setting worker and/or connector properties. However, enabling it > by default should be considered in a major version upgrade and after KIP-318 > is updated to mention the compatibility requirements and gets officially > approved. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] kkonstantine merged pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default
kkonstantine merged pull request #11908: URL: https://github.com/apache/kafka/pull/11908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
[ https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-13775: -- Fix Version/s: 3.1.1 > CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 > - > > Key: KAFKA-13775 > URL: https://issues.apache.org/jira/browse/KAFKA-13775 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.0.0, 3.0.1 >Reporter: Edwin Hobor >Assignee: Edwin Hobor >Priority: Major > Labels: CVE, security > Fix For: 3.2.0, 3.1.1 > > > *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see > [https://github.com/advisories/GHSA-57j2-w4cx-62h2]). > Upgrading to jackson-databind version *2.12.6.1* should address this issue. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
cadonna commented on pull request #11962: URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083560237 Thank you @edwin092 ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
cadonna commented on pull request #11962: URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083560072 Cherry-picked to 3.1 \cc @tombentley -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
[ https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-13775: - Assignee: Edwin Hobor > CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 > - > > Key: KAFKA-13775 > URL: https://issues.apache.org/jira/browse/KAFKA-13775 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.0.0, 3.0.1 >Reporter: Edwin Hobor >Assignee: Edwin Hobor >Priority: Major > Labels: CVE, security > Fix For: 3.2.0 > > > *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see > [https://github.com/advisories/GHSA-57j2-w4cx-62h2]). > Upgrading to jackson-databind version *2.12.6.1* should address this issue. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13660) Replace log4j with reload4j
[ https://issues.apache.org/jira/browse/KAFKA-13660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-13660: - Assignee: Mike Lothian > Replace log4j with reload4j > --- > > Key: KAFKA-13660 > URL: https://issues.apache.org/jira/browse/KAFKA-13660 > Project: Kafka > Issue Type: Bug > Components: logging >Affects Versions: 2.4.0, 3.0.0 >Reporter: Mike Lothian >Assignee: Mike Lothian >Priority: Major > > Kafka is using a known vulnerable version of log4j, the reload4j project was > created by the code's original authors to address those issues. It is > designed as a drop in replacement without any api changes > > https://reload4j.qos.ch/ > > I've raised a merge request, replacing log4j with reload4j, slf4j-log4j12 > with slf4j-reload4j and bumping the slf4j version > > This is my first time contributing to the Kafka project and I'm not too > familiar with the process, I'll go back and amend my PR with this issue number -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13660) Replace log4j with reload4j
[ https://issues.apache.org/jira/browse/KAFKA-13660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-13660. --- Resolution: Fixed > Replace log4j with reload4j > --- > > Key: KAFKA-13660 > URL: https://issues.apache.org/jira/browse/KAFKA-13660 > Project: Kafka > Issue Type: Bug > Components: logging >Affects Versions: 2.4.0, 3.0.0 >Reporter: Mike Lothian >Assignee: Mike Lothian >Priority: Major > Fix For: 3.2.0, 3.1.1 > > > Kafka is using a known vulnerable version of log4j, the reload4j project was > created by the code's original authors to address those issues. It is > designed as a drop in replacement without any api changes > > https://reload4j.qos.ch/ > > I've raised a merge request, replacing log4j with reload4j, slf4j-log4j12 > with slf4j-reload4j and bumping the slf4j version > > This is my first time contributing to the Kafka project and I'm not too > familiar with the process, I'll go back and amend my PR with this issue number -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13660) Replace log4j with reload4j
[ https://issues.apache.org/jira/browse/KAFKA-13660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-13660: -- Fix Version/s: 3.2.0 3.1.1 > Replace log4j with reload4j > --- > > Key: KAFKA-13660 > URL: https://issues.apache.org/jira/browse/KAFKA-13660 > Project: Kafka > Issue Type: Bug > Components: logging >Affects Versions: 2.4.0, 3.0.0 >Reporter: Mike Lothian >Assignee: Mike Lothian >Priority: Major > Fix For: 3.2.0, 3.1.1 > > > Kafka is using a known vulnerable version of log4j, the reload4j project was > created by the code's original authors to address those issues. It is > designed as a drop in replacement without any api changes > > https://reload4j.qos.ch/ > > I've raised a merge request, replacing log4j with reload4j, slf4j-log4j12 > with slf4j-reload4j and bumping the slf4j version > > This is my first time contributing to the Kafka project and I'm not too > familiar with the process, I'll go back and amend my PR with this issue number -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cadonna commented on pull request #11743: KAFKA-13660: Switch log4j12 to reload4j
cadonna commented on pull request #11743: URL: https://github.com/apache/kafka/pull/11743#issuecomment-1083550804 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest
dajac commented on a change in pull request #11971: URL: https://github.com/apache/kafka/pull/11971#discussion_r838900451 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) { List membersToRemove = new ArrayList<>(); for (final MemberDescription member : members) { +MemberIdentity memberIdentity = new MemberIdentity() +.setReason(reason); Review comment: This does not seem to be an issue. I have extended the unit test to ensure that the look up works as we expect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk
[ https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514910#comment-17514910 ] Tm Alkemade commented on KAFKA-13773: - [~junrao] Yes, thats true, it was a new different test run, (it had the same behavior though). I'll try to get a consistent run with logs in the right order tomorrow, unfortunately getting all the logs is a bit tricky sometimes in Kubernetes when pods are restarting. > Data loss after recovery from crash due to full hard disk > - > > Key: KAFKA-13773 > URL: https://issues.apache.org/jira/browse/KAFKA-13773 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.8.0, 3.1.0, 2.8.1 >Reporter: Tm Alkemade >Priority: Major > Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, > kafka-2.8.0-crash.zip, kafka-logfiles.zip > > > While doing some testing of Kafka on Kubernetes, the data disk for kafka > filled up, which led to all 3 nodes crashing. I increased the disk size for > all three nodes and started up kafka again (one by one, waiting for the > previous node to become available before starting the next one). After a > little while two out of three nodes had no data anymore. > According to the logs, the log cleaner kicked in and decided that the latest > timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older > than the 2 week limit specified on the topic. > > {code:java} > 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, > dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files > LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, > largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0] > 2022-03-28 12:17:19,753 INFO Deleted log > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted offset index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted time index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0]{code} > Using kafka-dump-log.sh I was able to determine that the greatest timestamp > in that file (before deletion) was actually 1648460888636 ( 2022-03-28, > 09:48:08 UTC, which is today). However since this segment was the > 'latest/current' segment much of the file is empty. The code that determines > the last entry (TimeIndex.lastEntryFromIndexFile) doesn't seem to know this > and just read the last position in the file, the file being mostly empty > causes it to read 0 for that position. > The cleaner code seems to take this into account since > UnifiedLog.deleteOldSegments is never supposed to delete the current segment, > judging by the scaladoc, however in this case the check doesn't seem to do > its job. Perhaps the detected highWatermark is wrong? > I've attached the logs and the zipped data directories (data files are over > 3Gb in size when unzipped) > > I've encountered this problem with both kafka 2.8.1 and 3.1.0. > I've also tried changing min.insync.replicas to 2: The issue still occurs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets
dajac commented on pull request #11936: URL: https://github.com/apache/kafka/pull/11936#issuecomment-1083525133 I will merge it tomorrow if no one object. This must be cherry-picked to 3.2 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #11743: KAFKA-13660: Switch log4j12 to reload4j
cadonna merged pull request #11743: URL: https://github.com/apache/kafka/pull/11743 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11743: KAFKA-13660: Switch log4j12 to reload4j
cadonna commented on pull request #11743: URL: https://github.com/apache/kafka/pull/11743#issuecomment-1083501431 Failures are unrelated: ``` Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testStartTwoConnectors Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.TransformationIntegrationTest.testFilterOnTopicNameWithSinkConnector Build / JDK 8 and Scala 2.12 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testElectionResultOutput, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk
[ https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514888#comment-17514888 ] Jun Rao commented on KAFKA-13773: - [~Timelad] : I checked kafka-0-2.8.0-before-fail.log kafka-1-2.8.0-before-fail.log in kafka-2.8.0-crash.zip. They both seem to have timestamp after 13:23:00,077, which is the time when the recovery is skipped. > Data loss after recovery from crash due to full hard disk > - > > Key: KAFKA-13773 > URL: https://issues.apache.org/jira/browse/KAFKA-13773 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.8.0, 3.1.0, 2.8.1 >Reporter: Tm Alkemade >Priority: Major > Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, > kafka-2.8.0-crash.zip, kafka-logfiles.zip > > > While doing some testing of Kafka on Kubernetes, the data disk for kafka > filled up, which led to all 3 nodes crashing. I increased the disk size for > all three nodes and started up kafka again (one by one, waiting for the > previous node to become available before starting the next one). After a > little while two out of three nodes had no data anymore. > According to the logs, the log cleaner kicked in and decided that the latest > timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older > than the 2 week limit specified on the topic. > > {code:java} > 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, > dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files > LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, > largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0] > 2022-03-28 12:17:19,753 INFO Deleted log > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted offset index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted time index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0]{code} > Using kafka-dump-log.sh I was able to determine that the greatest timestamp > in that file (before deletion) was actually 1648460888636 ( 2022-03-28, > 09:48:08 UTC, which is today). However since this segment was the > 'latest/current' segment much of the file is empty. The code that determines > the last entry (TimeIndex.lastEntryFromIndexFile) doesn't seem to know this > and just read the last position in the file, the file being mostly empty > causes it to read 0 for that position. > The cleaner code seems to take this into account since > UnifiedLog.deleteOldSegments is never supposed to delete the current segment, > judging by the scaladoc, however in this case the check doesn't seem to do > its job. Perhaps the detected highWatermark is wrong? > I've attached the logs and the zipped data directories (data files are over > 3Gb in size when unzipped) > > I've encountered this problem with both kafka 2.8.1 and 3.1.0. > I've also tried changing min.insync.replicas to 2: The issue still occurs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest
dajac commented on a change in pull request #11971: URL: https://github.com/apache/kafka/pull/11971#discussion_r838861385 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) { List membersToRemove = new ArrayList<>(); for (final MemberDescription member : members) { +MemberIdentity memberIdentity = new MemberIdentity() +.setReason(reason); Review comment: Good question. Let me double check this. For the context, we were doing this before this patch as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
[ https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-13775. --- Resolution: Fixed > CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 > - > > Key: KAFKA-13775 > URL: https://issues.apache.org/jira/browse/KAFKA-13775 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.0.0, 3.0.1 >Reporter: Edwin Hobor >Priority: Major > Labels: CVE, security > Fix For: 3.2.0 > > > *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see > [https://github.com/advisories/GHSA-57j2-w4cx-62h2]). > Upgrading to jackson-databind version *2.12.6.1* should address this issue. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
[ https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-13775: -- Fix Version/s: 3.2.0 > CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 > - > > Key: KAFKA-13775 > URL: https://issues.apache.org/jira/browse/KAFKA-13775 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.0.0, 3.0.1 >Reporter: Edwin Hobor >Priority: Major > Labels: CVE, security > Fix For: 3.2.0 > > > *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see > [https://github.com/advisories/GHSA-57j2-w4cx-62h2]). > Upgrading to jackson-databind version *2.12.6.1* should address this issue. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
cadonna commented on pull request #11962: URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083490149 Cherry-picked to 3.2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13773) Data loss after recovery from crash due to full hard disk
[ https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tm Alkemade updated KAFKA-13773: Affects Version/s: 2.8.0 > Data loss after recovery from crash due to full hard disk > - > > Key: KAFKA-13773 > URL: https://issues.apache.org/jira/browse/KAFKA-13773 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.8.0, 3.1.0, 2.8.1 >Reporter: Tm Alkemade >Priority: Major > Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, > kafka-2.8.0-crash.zip, kafka-logfiles.zip > > > While doing some testing of Kafka on Kubernetes, the data disk for kafka > filled up, which led to all 3 nodes crashing. I increased the disk size for > all three nodes and started up kafka again (one by one, waiting for the > previous node to become available before starting the next one). After a > little while two out of three nodes had no data anymore. > According to the logs, the log cleaner kicked in and decided that the latest > timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older > than the 2 week limit specified on the topic. > > {code:java} > 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, > dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files > LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, > largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0] > 2022-03-28 12:17:19,753 INFO Deleted log > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted offset index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted time index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0]{code} > Using kafka-dump-log.sh I was able to determine that the greatest timestamp > in that file (before deletion) was actually 1648460888636 ( 2022-03-28, > 09:48:08 UTC, which is today). However since this segment was the > 'latest/current' segment much of the file is empty. The code that determines > the last entry (TimeIndex.lastEntryFromIndexFile) doesn't seem to know this > and just read the last position in the file, the file being mostly empty > causes it to read 0 for that position. > The cleaner code seems to take this into account since > UnifiedLog.deleteOldSegments is never supposed to delete the current segment, > judging by the scaladoc, however in this case the check doesn't seem to do > its job. Perhaps the detected highWatermark is wrong? > I've attached the logs and the zipped data directories (data files are over > 3Gb in size when unzipped) > > I've encountered this problem with both kafka 2.8.1 and 3.1.0. > I've also tried changing min.insync.replicas to 2: The issue still occurs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cadonna merged pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
cadonna merged pull request #11962: URL: https://github.com/apache/kafka/pull/11962 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
cadonna commented on pull request #11962: URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083482813 Failures are unrelated ``` Build / JDK 8 and Scala 2.12 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() Build / JDK 11 and Scala 2.13 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk
[ https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514880#comment-17514880 ] Tm Alkemade commented on KAFKA-13773: - {quote}'but it's not clear if the previous shutdown was clean or not' {quote} The previous shutdown should not have been clean, since there wasn't enough disk space, I've attached the log for the time of the crash in kafka-2.8.0-crash.zip. I'll try to see if I can get the log for the first restart after the first disk space crash tomorrow, it might give some more insight. {quote}After that, if the broker is restarted again, it should go through log recovery, did that happen? {quote} Kafka skipped log recovery once it had enough disk space. See kafka-2.7.0vs2.8.0.zip, it contains the log4j logs of the first start after resizing the disk. > Data loss after recovery from crash due to full hard disk > - > > Key: KAFKA-13773 > URL: https://issues.apache.org/jira/browse/KAFKA-13773 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 3.1.0, 2.8.1 >Reporter: Tm Alkemade >Priority: Major > Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, > kafka-2.8.0-crash.zip, kafka-logfiles.zip > > > While doing some testing of Kafka on Kubernetes, the data disk for kafka > filled up, which led to all 3 nodes crashing. I increased the disk size for > all three nodes and started up kafka again (one by one, waiting for the > previous node to become available before starting the next one). After a > little while two out of three nodes had no data anymore. > According to the logs, the log cleaner kicked in and decided that the latest > timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older > than the 2 week limit specified on the topic. > > {code:java} > 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, > dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files > LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, > largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0] > 2022-03-28 12:17:19,753 INFO Deleted log > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted offset index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted time index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0]{code} > Using kafka-dump-log.sh I was able to determine that the greatest timestamp > in that file (before deletion) was actually 1648460888636 ( 2022-03-28, > 09:48:08 UTC, which is today). However since this segment was the > 'latest/current' segment much of the file is empty. The code that determines > the last entry (TimeIndex.lastEntryFromIndexFile) doesn't seem to know this > and just read the last position in the file, the file being mostly empty > causes it to read 0 for that position. > The cleaner code seems to take this into account since > UnifiedLog.deleteOldSegments is never supposed to delete the current segment, > judging by the scaladoc, however in this case the check doesn't seem to do > its job. Perhaps the detected highWatermark is wrong? > I've attached the logs and the zipped data directories (data files are over > 3Gb in size when unzipped) > > I've encountered this problem with both kafka 2.8.1 and 3.1.0. > I've also tried changing min.insync.replicas to 2: The issue still occurs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12511) Flaky test DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
[ https://issues.apache.org/jira/browse/KAFKA-12511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514875#comment-17514875 ] Bruno Cadonna commented on KAFKA-12511: --- https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11962/4/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicListenerConnectionCreationRateQuota___2/ {code} java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Listener EXTERNAL connection rate 4.8959608323133414 must be below 4.8 ==> expected: but was: {code} > Flaky test > DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() > -- > > Key: KAFKA-12511 > URL: https://issues.apache.org/jira/browse/KAFKA-12511 > Project: Kafka > Issue Type: Bug >Reporter: dengziming >Priority: Minor > > First time: > Listener PLAINTEXT connection rate 14.419389476913636 must be below > 14.399 ==> expected: but was: > Second time: > Listener EXTERNAL connection rate 10.998243336133811 must be below > 10.799 ==> expected: but was: > details: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10289/4/testReport/junit/kafka.network/DynamicConnectionQuotaTest/Build___JDK_11___testDynamicListenerConnectionCreationRateQuota__/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] lihaosky commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest
lihaosky commented on a change in pull request #11971: URL: https://github.com/apache/kafka/pull/11971#discussion_r838839458 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) { List membersToRemove = new ArrayList<>(); for (final MemberDescription member : members) { +MemberIdentity memberIdentity = new MemberIdentity() +.setReason(reason); Review comment: QQ: seems `reason` is also used in `equals`, is it ok to add this? Will this cause `membersToRemove` not found from somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11968: add toString method
dajac commented on pull request #11968: URL: https://github.com/apache/kafka/pull/11968#issuecomment-1083453850 @chenzhongyu11 Thanks for the PR. I see that the issue is already fixed in trunk. Is that correct? 2.5 is pretty old, I don’t think that we will ever release a minor release for it. Did you consider upgrading your client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets
dajac commented on pull request #11936: URL: https://github.com/apache/kafka/pull/11936#issuecomment-1083445963 @showuon Any further comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji edited a comment on pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common
hachikuji edited a comment on pull request #11970: URL: https://github.com/apache/kafka/pull/11970#issuecomment-1083410018 @dajac Yeah, exactly. That is why I did not want to touch the old functions here. To be honest, we might never get rid of them since there's not a ton of incentive to do so. Nevertheless, I think it's helpful to have these new functions in `server-common` with explicit ordering. It will help with KRaft in particular where we need to redefine metrics so that the mbeans have the same name as existing metrics. In general, I think we're moving toward a more modular server structure, so there will probably be other use cases as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13719) connector restart cause duplicate tasks
[ https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-13719: --- Fix Version/s: 3.2.0 > connector restart cause duplicate tasks > --- > > Key: KAFKA-13719 > URL: https://issues.apache.org/jira/browse/KAFKA-13719 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Shujie Sun >Assignee: Shujie Sun >Priority: Critical > Fix For: 3.2.0, 3.1.1, 3.3.0 > > > Restart connector with parameter includeTasks=true=false cause > duplicate tasks and duplicate message。 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mimaison commented on pull request #11869: KAFKA-13719: fix connector restart cause duplicate tasks
mimaison commented on pull request #11869: URL: https://github.com/apache/kafka/pull/11869#issuecomment-1083415753 Backported to 3.2 and 3.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13719) connector restart cause duplicate tasks
[ https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-13719: --- Fix Version/s: 3.1.1 > connector restart cause duplicate tasks > --- > > Key: KAFKA-13719 > URL: https://issues.apache.org/jira/browse/KAFKA-13719 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Shujie Sun >Assignee: Shujie Sun >Priority: Critical > Fix For: 3.1.1, 3.3.0 > > > Restart connector with parameter includeTasks=true=false cause > duplicate tasks and duplicate message。 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji edited a comment on pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common
hachikuji edited a comment on pull request #11970: URL: https://github.com/apache/kafka/pull/11970#issuecomment-1083410018 @dajac Yeah, exactly. That is why I did not want to touch the old functions here. To be honest, we might never get rid of them since there's not a ton of incentive to do so. Nevertheless, I think it's helpful to have these new functions in `server-common` with explicit ordering. It will help with KRaft in particular where we need to redefine metrics so that the mbeans have the same name as existing metrics. In general, I think we're moving toward a more modular structure, so there will probably be other use cases as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common
hachikuji commented on pull request #11970: URL: https://github.com/apache/kafka/pull/11970#issuecomment-1083410018 @dajac Yeah, exactly. That is why I did not want to touch the old functions here. To be honest, we might never get rid of them since there's not a ton of incentive to do so. Nevertheless, I think it's helpful to have these new functions in `server-common` with explicit ordering. It will help with KRaft in particular where we need to redefine metrics so that the mbeans have the same name. In general, I think we're moving toward a more modular structure, so there will probably be other use cases as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized
yufeiyan1220 commented on a change in pull request #11953: URL: https://github.com/apache/kafka/pull/11953#discussion_r838767189 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ## @@ -210,4 +217,114 @@ class AbstractFetcherManagerTest { verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds) verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds) } + + @Test + def testExpandThreadPool(): Unit = { +testResizeThreadPool(10, 50) + } + + @Test + def testShrinkThreadPool(): Unit = { +testResizeThreadPool(50, 10) + } + + private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: Int, brokerNum: Int = 6): Unit = { +val fetchingTopicPartitions = makeTopicPartition(10, 100) +val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed") +val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", currentFetcherSize) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { +new TestResizeFetcherThread(sourceBroker, failedPartitions) + } +} +try { + fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp => +val brokerId = getBrokerId(tp, brokerNum) +val brokerEndPoint = new BrokerEndPoint(brokerId, s"kafka-host-$brokerId", 9092) +tp -> InitialFetchState(None, brokerEndPoint, 0, 0) + }.toMap) + + // Mark some of these partitions failed within resizing scope + fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition) + // Mark failed partitions out of resizing scope + failedTopicPartitions.foreach(fetcherManager.addFailedPartition) + + fetcherManager.resizeThreadPool(newFetcherSize) + + val ownedPartitions = mutable.Set.empty[TopicPartition] + fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, fetcherThread) => +val fetcherId = brokerIdAndFetcherId.fetcherId +val brokerId = brokerIdAndFetcherId.brokerId + +fetcherThread.partitions.foreach { tp => + ownedPartitions += tp + assertEquals(fetcherManager.getFetcherId(tp), fetcherId) + assertEquals(getBrokerId(tp, brokerNum), brokerId) +} + } + // Verify that all partitions are owned by the fetcher threads. + assertEquals(fetchingTopicPartitions, ownedPartitions) + + val failedPartitionsAfterResize = fetcherManager.failedPartitions.failedPartitions() + // Verify that failed partitions within resizing scope are removed, otherwise retained + assertEquals(Set.empty, fetchingTopicPartitions.intersect(failedPartitionsAfterResize)) + assertEquals(failedTopicPartitions, failedTopicPartitions.intersect(failedPartitionsAfterResize)) Review comment: yeah! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk
[ https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514808#comment-17514808 ] Jun Rao commented on KAFKA-13773: - [~Timelad] : Thanks for the additional logs. I took a quick look at kafka-2-2.8.0-before-resize.log. The broker did skip recovery, but it's not clear if the previous shutdown was clean or not. Do you have the log before that? During loading, the broker shut down abruptly due to the no space issue. After that, if the broker is restarted again, it should go through log recovery, did that happen? {code:java} 2022-03-30 13:23:00,077 INFO Skipping recovery for all logs in /var/lib/kafka/data-0/kafka-log2 since clean shutdown file was found (kafka.log.LogManager) [main] {code} > Data loss after recovery from crash due to full hard disk > - > > Key: KAFKA-13773 > URL: https://issues.apache.org/jira/browse/KAFKA-13773 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 3.1.0, 2.8.1 >Reporter: Tm Alkemade >Priority: Major > Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, > kafka-2.8.0-crash.zip, kafka-logfiles.zip > > > While doing some testing of Kafka on Kubernetes, the data disk for kafka > filled up, which led to all 3 nodes crashing. I increased the disk size for > all three nodes and started up kafka again (one by one, waiting for the > previous node to become available before starting the next one). After a > little while two out of three nodes had no data anymore. > According to the logs, the log cleaner kicked in and decided that the latest > timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older > than the 2 week limit specified on the topic. > > {code:java} > 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, > dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files > LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, > largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0] > 2022-03-28 12:17:19,753 INFO Deleted log > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted offset index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted time index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0]{code} > Using kafka-dump-log.sh I was able to determine that the greatest timestamp > in that file (before deletion) was actually 1648460888636 ( 2022-03-28, > 09:48:08 UTC, which is today). However since this segment was the > 'latest/current' segment much of the file is empty. The code that determines > the last entry (TimeIndex.lastEntryFromIndexFile) doesn't seem to know this > and just read the last position in the file, the file being mostly empty > causes it to read 0 for that position. > The cleaner code seems to take this into account since > UnifiedLog.deleteOldSegments is never supposed to delete the current segment, > judging by the scaladoc, however in this case the check doesn't seem to do > its job. Perhaps the detected highWatermark is wrong? > I've attached the logs and the zipped data directories (data files are over > 3Gb in size when unzipped) > > I've encountered this problem with both kafka 2.8.1 and 3.1.0. > I've also tried changing min.insync.replicas to 2: The issue still occurs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] rajinisivaram merged pull request #11973: MINOR: Increase wait in ZooKeeperClientTest
rajinisivaram merged pull request #11973: URL: https://github.com/apache/kafka/pull/11973 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others
viktorsomogyi commented on pull request #10738: URL: https://github.com/apache/kafka/pull/10738#issuecomment-1083329319 @omkreddy I've addressed your comments. Would you please check the PR again? Also I see that some of the configurations fail to build but I don't know why is that. Locally they're fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized
dajac commented on a change in pull request #11953: URL: https://github.com/apache/kafka/pull/11953#discussion_r838689224 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ## @@ -210,4 +217,114 @@ class AbstractFetcherManagerTest { verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds) verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds) } + + @Test + def testExpandThreadPool(): Unit = { +testResizeThreadPool(10, 50) + } + + @Test + def testShrinkThreadPool(): Unit = { +testResizeThreadPool(50, 10) + } + + private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: Int, brokerNum: Int = 6): Unit = { +val fetchingTopicPartitions = makeTopicPartition(10, 100) +val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed") +val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", currentFetcherSize) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { +new TestResizeFetcherThread(sourceBroker, failedPartitions) + } +} +try { + fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp => +val brokerId = getBrokerId(tp, brokerNum) +val brokerEndPoint = new BrokerEndPoint(brokerId, s"kafka-host-$brokerId", 9092) +tp -> InitialFetchState(None, brokerEndPoint, 0, 0) + }.toMap) + + // Mark some of these partitions failed within resizing scope + fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition) + // Mark failed partitions out of resizing scope + failedTopicPartitions.foreach(fetcherManager.addFailedPartition) + + fetcherManager.resizeThreadPool(newFetcherSize) + + val ownedPartitions = mutable.Set.empty[TopicPartition] + fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, fetcherThread) => +val fetcherId = brokerIdAndFetcherId.fetcherId +val brokerId = brokerIdAndFetcherId.brokerId + +fetcherThread.partitions.foreach { tp => + ownedPartitions += tp + assertEquals(fetcherManager.getFetcherId(tp), fetcherId) + assertEquals(getBrokerId(tp, brokerNum), brokerId) +} + } + // Verify that all partitions are owned by the fetcher threads. + assertEquals(fetchingTopicPartitions, ownedPartitions) + + val failedPartitionsAfterResize = fetcherManager.failedPartitions.failedPartitions() + // Verify that failed partitions within resizing scope are removed, otherwise retained + assertEquals(Set.empty, fetchingTopicPartitions.intersect(failedPartitionsAfterResize)) + assertEquals(failedTopicPartitions, failedTopicPartitions.intersect(failedPartitionsAfterResize)) Review comment: Do we really need `intersect`? I thought that `failedPartitionsAfterResize` should be equal to `failedTopicPartitions`, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection
ijuma commented on a change in pull request #11965: URL: https://github.com/apache/kafka/pull/11965#discussion_r838688468 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1234,8 +1234,9 @@ class ReplicaManager(val config: KafkaConfig, fetchOffset: Long, currentTimeMs: Long): Option[Int] = { partition.leaderReplicaIdOpt.flatMap { leaderReplicaId => Review comment: Looks like I assumed that leaderReplicaIdOpt would only be set if the broker was the leader. We should check if the docs need to be clarified (and close the test coverage gap, of course). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13778) Fetch from follower should never run the preferred read replica selection
[ https://issues.apache.org/jira/browse/KAFKA-13778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhaobo updated KAFKA-13778: --- Summary: Fetch from follower should never run the preferred read replica selection (was: Fix follower broker also always execute preferred read-replica selection) > Fetch from follower should never run the preferred read replica selection > - > > Key: KAFKA-13778 > URL: https://issues.apache.org/jira/browse/KAFKA-13778 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.2.0 >Reporter: zhaobo >Assignee: zhaobo >Priority: Minor > > The design purpose of the code is that only the leader broker can determine > the preferred read-replica. > > {code:java} > readFromLocalLog() > > // If we are the leader, determine the preferred read-replica > val preferredReadReplica = clientMetadata.flatMap( > metadata => findPreferredReadReplica(partition, metadata, replicaId, > fetchInfo.fetchOffset, fetchTimeMs)) {code} > > But in fact, since the broker does not judge whether it is the leader or not, > the follower will also execute the preferred read-replica selection. > {code:java} > partition.leaderReplicaIdOpt.flatMap { leaderReplicaId => > // Don't look up preferred for follower fetches via normal replication and > if (Request.isValidBrokerId(replicaId)) > None > else { {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] bozhao12 commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection
bozhao12 commented on pull request #11965: URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083262191 @dajac Thanks for your review, the case you mentioned does occur in some scenarios. I will submit a new unit test based on your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest
cadonna commented on pull request #11971: URL: https://github.com/apache/kafka/pull/11971#issuecomment-1083246616 @dajac Please go ahead! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest
dajac commented on pull request #11971: URL: https://github.com/apache/kafka/pull/11971#issuecomment-1083244092 cc @cadonna I'd like to get this one in 3.2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11928: KAFKA-13739: Sliding window with no grace period not working
bbejeck commented on pull request #11928: URL: https://github.com/apache/kafka/pull/11928#issuecomment-1083230921 @tombentley I'd like to get this into 3.1.1 as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection
dajac commented on pull request #11965: URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083229729 cc @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac edited a comment on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection
dajac edited a comment on pull request #11965: URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083225964 @bozhao12 Thanks for reporting this one. I took a deeper look at it and I agree with your finding. The follower runs the preferred read replica selection logic as well. Nice one ;) In most of the cases, it still works because, as you said, the `RackAwareReplicaSelector` returns the leader when it cannot find a replica in the same rack and the leader is filtered out in the logic. If you have more than 1 replicas per rack, you can easily get in a situation where the consumer can't consumer anything because it is redirected continuously between the two replicas. This is only possible if the replica has still some replica states left around from it previous leadership. This can happen when the partition is reassigned multiple times for instance. I suppose that things could be worse depending on the implementation of the `ReplicaSelector`. Luckily most people use the `RackAwareReplicaSelector` and three replicas. That seems to be a regression introduced in this commit: https://github.com/apache/kafka/commit/fbfda2c4ad889c731aa52b5214e0521f187f8db6. I do agree that we should fix this but we need to come up with a better test which verifies this. Your current test does not really fail because the leader is automatically removed. We could perhaps create a `MockSelector` which implements `ReplicaSelector` and incremented a counter or something along those line. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org