[GitHub] [kafka] lihaosky commented on pull request #11829: [2/N][emit final] add processor metadata to be committed with offset

2022-03-30 Thread GitBox


lihaosky commented on pull request #11829:
URL: https://github.com/apache/kafka/pull/11829#issuecomment-1084100040


   Thanks @mjsax ! Created Jira: 
https://issues.apache.org/jira/browse/KAFKA-13785 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13785) Support emit final result for windowed aggregation

2022-03-30 Thread Hao Li (Jira)
Hao Li created KAFKA-13785:
--

 Summary: Support emit final result for windowed aggregation
 Key: KAFKA-13785
 URL: https://issues.apache.org/jira/browse/KAFKA-13785
 Project: Kafka
  Issue Type: Improvement
Reporter: Hao Li


For KIP-825: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13785) Support emit final result for windowed aggregation

2022-03-30 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li reassigned KAFKA-13785:
--

Assignee: Hao Li

> Support emit final result for windowed aggregation
> --
>
> Key: KAFKA-13785
> URL: https://issues.apache.org/jira/browse/KAFKA-13785
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> For KIP-825: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized

2022-03-30 Thread GitBox


yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839195366



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -743,6 +743,18 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def removeAllPartitions(): Map[TopicPartition, PartitionFetchState] = {
+partitionMapLock.lockInterruptibly()

Review comment:
   It might release the lock when trying to get all partitions  and use the 
result to remove might lead to inconsistency(when this fetcher add some 
partition right between the two process called by `AbstractFetcherManager`. So 
I make the process into one new method.
   Another point is that we don't need to do some filtering or get the state of 
thread one by one, but just iterate and return the copy of the whole 
`partitionStates.partitionStateMap`. I think it seems more efficient in 
realization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11945: KAFKA-13769: Explicitly route FK join results to correct partitions

2022-03-30 Thread GitBox


mjsax commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r839183920



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##
@@ -60,6 +63,42 @@ public void setIfUnset(final SerdeGetter getter) {
 }
 }
 
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+this.upgradeFromV0 = upgradeFromV0(configs);
+}
+
+private static boolean upgradeFromV0(final Map configs) {
+final Object upgradeFrom = 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+if (!(upgradeFrom instanceof String)) {

Review comment:
   As above.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -636,7 +684,10 @@
 "Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + 
UPGRADE_FROM_0101 + "\", \"" +
 UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + 
UPGRADE_FROM_10 + "\", \"" +
 UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + 
UPGRADE_FROM_21 + "\", \"" +
-UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from 
the corresponding old version).";

Review comment:
   > When upgrading from 2.4 to a newer version it is not required to 
specify this config.
   
   Seems this needs an update?

##
File path: docs/streams/upgrade-guide.html
##
@@ -34,9 +34,9 @@ Upgrade Guide and API Changes
 
 
 
-Upgrading from any older version to {{fullDotVersion}} is possible: if 
upgrading from 2.3 or below, you will need to do two rolling bounces, where 
during the first rolling bounce phase you set the config 
upgrade.from="older version"
-(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
-rebalancing protocol if you skip or delay the second rolling bounce, 
but you can safely switch over to cooperative at any time once the entire group 
is on 2.4+ by removing the config value and bouncing. For more details please 
refer to
+Upgrading from any older version to {{fullDotVersion}} is possible: if 
upgrading from 3.3 or below, you will need to do two rolling bounces, where 
during the first rolling bounce phase you set the config 
upgrade.from="older version"

Review comment:
   > upgrading from 3.3
   
   Should be `3.2` -- the fix should go into 3.3, right?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##
@@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) {
 return buf.array();
 }
 
+private byte[] serializeV1(final String topic, final 
SubscriptionResponseWrapper data) {
+final byte[] serializedData = data.getForeignValue() == null ? 
null : serializer.serialize(topic, data.getForeignValue());
+final int serializedDataLength = serializedData == null ? 0 : 
serializedData.length;
+final long[] originalHash = data.getOriginalValueHash();
+final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
+final int primaryPartitionLength = Integer.BYTES;
+final int dataLength = 1 + hashLength + serializedDataLength + 
primaryPartitionLength;
+
+final ByteBuffer buf = ByteBuffer.allocate(dataLength);
+
+if (originalHash != null) {
+buf.put(data.getVersion());
+} else {
+buf.put((byte) (data.getVersion() | (byte) 0x80));
+}
+buf.putInt(data.getPrimaryPartition());

Review comment:
   >  This means that we can't just put data.getVersion() into the buffer, 
but have to hardcode it to 0
   
   Yes, but this seems ok? Ie, we would have a `serializeV0Internal(final short 
version)` that is called as (something like):
   ```
   public byte[] serializeV0() {
 return serializeV0Internal(0).getBytes();
   }
   
   public byte[] serializeV1() {
 final ByteStream out = serializeV0Internal(0);
 
 // add v1 stuff to `out`
 
 return out.getBytes();
   }
   ```
   
   > Making this logic generic enough to handle v0, upgrade-from, and v1 will 
be cumbersome.
   
   Why?
   
   I understand that putting variable length at the end could be simpler. I 
guess I am just used to how it's usually done if Kafka. And personally, if I 
read code it's easier to have a mental model of the different versions in the 
bytes, if older versions are always a prefix of newer versions?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##
@@ -62,6 

[GitHub] [kafka] dengziming commented on pull request #11935: MINOR: Remove some unused code

2022-03-30 Thread GitBox


dengziming commented on pull request #11935:
URL: https://github.com/apache/kafka/pull/11935#issuecomment-1084076340


   Hello @showuon  , PTAL, Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11829: [2/N][emit final] add processor metadata to be committed with offset

2022-03-30 Thread GitBox


mjsax commented on a change in pull request #11829:
URL: https://github.com/apache/kafka/pull/11829#discussion_r839166179



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -537,6 +542,7 @@ public void closeDirty() {
 public void updateInputPartitions(final Set 
topicPartitions, final Map> 
allTopologyNodesToSourceTopics) {
 super.updateInputPartitions(topicPartitions, 
allTopologyNodesToSourceTopics);
 partitionGroup.updatePartitions(topicPartitions, 
recordQueueCreator::createQueue);
+processorContext.getProcessorMetadata().setNeedsCommit(true);

Review comment:
   I agree it might be a rare case. If we think it's not worth it and the 
risk to lose the metadata is tiny (and it's complicated to force a commit right 
away) I am fine with not closing this race condition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized

2022-03-30 Thread GitBox


yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839161947



##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
 def migratePartitions(newSize: Int): Unit = {
+  val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
   fetcherThreadMap.forKeyValue { (id, thread) =>
-val partitionStates = removeFetcherForPartitions(thread.partitions)
+val partitionStates = thread.removeAllPartitions()

Review comment:
   I think unless there is a  null as  value in 
`partitionStates.partitionStateMap`,  there is no NPE thrown. Method 
`partitionMapLock` make sure there is no other thread changing 
`partitionStates` in this process. The original version need to filter the null 
value because `partitionStates.stateValue` might return null when the partition 
is not included in `partitionStates.partitionStateMap`.  
   
   I am not sure, may be I should add the filter logic as well or just make it 
by feeding all partitions to `removePartitions `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized

2022-03-30 Thread GitBox


yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839164671



##
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##
@@ -210,4 +218,115 @@ class AbstractFetcherManagerTest {
 verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
 verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: 
Int, brokerNum: Int = 6): Unit = {
+val fetchingTopicPartitions = makeTopicPartition(10, 100)
+val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherSize) {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+new TestResizeFetcherThread(sourceBroker, failedPartitions)
+  }
+}
+try {
+  fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp 
=>
+val brokerId = getBrokerId(tp, brokerNum)
+val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+  }.toMap)
+
+  // Mark some of these partitions failed within resizing scope
+  
fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+  // Mark failed partitions out of resizing scope
+  failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+  fetcherManager.resizeThreadPool(newFetcherSize)
+
+  val ownedPartitions = mutable.Set.empty[TopicPartition]
+  fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+val fetcherId = brokerIdAndFetcherId.fetcherId
+val brokerId = brokerIdAndFetcherId.brokerId
+
+fetcherThread.partitions.foreach { tp =>
+  ownedPartitions += tp
+  assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+  assertEquals(getBrokerId(tp, brokerNum), brokerId)
+}
+  }
+  // Verify that all partitions are owned by the fetcher threads.
+  assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+  val failedPartitionsAfterResize = 
fetcherManager.failedPartitions.failedPartitions()
+  // Verify that failed partitions within resizing scope are removed, 
otherwise retained

Review comment:
   Some failed partitions are marked as failed and removed in 
`markPartitionFailed` and I think the resizing process should not clear them, 
but to retain them as failed after resizing finished. This is to simulate the 
state of failed partitions before and after resizing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized

2022-03-30 Thread GitBox


yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839161947



##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
 def migratePartitions(newSize: Int): Unit = {
+  val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
   fetcherThreadMap.forKeyValue { (id, thread) =>
-val partitionStates = removeFetcherForPartitions(thread.partitions)
+val partitionStates = thread.removeAllPartitions()

Review comment:
   I think unless there is a  null as  value in 
`partitionStates.partitionStateMap`,  there is no NPE thrown. I think 
`partitionMapLock` make sure there is no other thread changing 
`partitionStates` in this process. The original version need to filter the null 
value because `partitionStates.stateValue` might return null when the partition 
is not included in `partitionStates.partitionStateMap`.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized

2022-03-30 Thread GitBox


showuon commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839140933



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -743,6 +743,18 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def removeAllPartitions(): Map[TopicPartition, PartitionFetchState] = {
+partitionMapLock.lockInterruptibly()

Review comment:
   Maybe we can just feed all partitions into `removePartitions` method? 

##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
 def migratePartitions(newSize: Int): Unit = {
+  val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
   fetcherThreadMap.forKeyValue { (id, thread) =>
-val partitionStates = removeFetcherForPartitions(thread.partitions)
+val partitionStates = thread.removeAllPartitions()

Review comment:
   Originally, we call `removeFetcherForPartitions`, where we will filter 
out the `InitialFetchState == null` partitions 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L742).
 I'm not sure if this is something we should care about, but I think we'd 
better to keep the filter logic to avoid the NPE thrown.

##
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##
@@ -210,4 +218,115 @@ class AbstractFetcherManagerTest {
 verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
 verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: 
Int, brokerNum: Int = 6): Unit = {
+val fetchingTopicPartitions = makeTopicPartition(10, 100)
+val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherSize) {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+new TestResizeFetcherThread(sourceBroker, failedPartitions)
+  }
+}
+try {
+  fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp 
=>
+val brokerId = getBrokerId(tp, brokerNum)
+val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+  }.toMap)
+
+  // Mark some of these partitions failed within resizing scope
+  
fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+  // Mark failed partitions out of resizing scope
+  failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+  fetcherManager.resizeThreadPool(newFetcherSize)
+
+  val ownedPartitions = mutable.Set.empty[TopicPartition]
+  fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+val fetcherId = brokerIdAndFetcherId.fetcherId
+val brokerId = brokerIdAndFetcherId.brokerId
+
+fetcherThread.partitions.foreach { tp =>
+  ownedPartitions += tp
+  assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+  assertEquals(getBrokerId(tp, brokerNum), brokerId)
+}
+  }
+  // Verify that all partitions are owned by the fetcher threads.
+  assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+  val failedPartitionsAfterResize = 
fetcherManager.failedPartitions.failedPartitions()
+  // Verify that failed partitions within resizing scope are removed, 
otherwise retained

Review comment:
   Could you explain what does `otherwise retained` mean? I can understand 
the failed partitions within resizing scope should be removed since we've 
verified it, but don't know the latter one. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11919: MINOR: Unify the log output of JaasContext.defaultContext

2022-03-30 Thread GitBox


RivenSun2 commented on pull request #11919:
URL: https://github.com/apache/kafka/pull/11919#issuecomment-1084017410


   Hi @dajac @ijuma 
   please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11911: KAFKA-13463: Make pause behavior consistent between cooperative and eager protocols

2022-03-30 Thread GitBox


RivenSun2 commented on pull request #11911:
URL: https://github.com/apache/kafka/pull/11911#issuecomment-1084016980






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11947: MINOR: Improve the description of principal under different mechanisms of sasl

2022-03-30 Thread GitBox


RivenSun2 commented on pull request #11947:
URL: https://github.com/apache/kafka/pull/11947#issuecomment-1084016362






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2022-03-30 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515035#comment-17515035
 ] 

RivenSun commented on KAFKA-13771:
--

Hi [~ijuma]  [~rsivaram]
Could you give some suggestions for this issue?
Thanks.

> Support to explicitly delete delegationTokens that have expired but have not 
> been automatically cleaned up
> --
>
> Key: KAFKA-13771
> URL: https://issues.apache.org/jira/browse/KAFKA-13771
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: RivenSun
>Priority: Major
>
> Quoting the official documentation
> {quote}
> Tokens can also be cancelled explicitly. If a token is not renewed by the 
> token’s expiration time or if token is beyond the max life time, it will be 
> deleted from all broker caches as well as from zookeeper.
> {quote}
> 1. The first point above means that after the `AdminClient` initiates the 
> EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() 
> method on the KafkaServer side, if the user passes in expireLifeTimeMs less 
> than 0, KafaServer will delete the corresponding delegationToken directly.
> 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, 
> which is responsible for regularly cleaning up expired tokens. The execution 
> interval is `delegation.token.expiry.check.interval.ms`, and the default 
> value is one hour.
> But carefully analyze the code logic in DelegationTokenManager.expireToken(), 
> *now Kafka does not support users to delete an expired delegationToken that 
> he no longer uses/renew. If the user wants to do this, they will receive a 
> DelegationTokenExpiredException.*
> In the worst case, an expired delegationToken may still can be used normally 
> within {*}an hour{*}, even if this configuration 
> (delegation.token.expiry.check.interval.ms) broker can shorten the 
> configuration as much as possible.
> The solution is very simple, simply adjust the `if` order of 
> DelegationTokenManager.expireToken().
> {code:java}
> if (!allowedToRenew(principal, tokenInfo)) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
> } else if (expireLifeTimeMs < 0) { //expire immediately
>   removeToken(tokenInfo.tokenId)
>   info(s"Token expired for token: ${tokenInfo.tokenId} for owner: 
> ${tokenInfo.owner}")
>   expireResponseCallback(Errors.NONE, now)
> } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
> } else {
>   //set expiry time stamp
>  ..
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13757) Improve the annotations of all related methods of DelegationToken in the Admin class

2022-03-30 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515034#comment-17515034
 ] 

RivenSun commented on KAFKA-13757:
--

Hi [~dajac]  [~rsivaram]
Could you give some suggestions for this issue?
Thanks.

> Improve the annotations of all related methods of DelegationToken in the 
> Admin class
> 
>
> Key: KAFKA-13757
> URL: https://issues.apache.org/jira/browse/KAFKA-13757
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> DelegationToken is a great and lightweight feature, but when users actually 
> use it, they get confused.
> From the existing official documents/comments on methods/comments on method 
> parameters, the user cannot know what is the specific processing logic of the 
> server and what is the meaning of the returned fields after he calls the 
> XXXDelegationToken(...) method.
> After reading the source code, I briefly sorted out the processing logic of 
> the XXXDelegationToken(...) method on the server side.
> 1. createDelegationToken:
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the renewers principal type is not KafkaPrincipal.USER_TYPE, throw 
> InvalidPrincipalTypeException
> // 4. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //   maxLifeTime = `maxLifeTimeMs` <= 0 ? 
> brokerConfig.delegationTokenMaxLifeMs : Math.min(`maxLifeTimeMs`, 
> brokerConfig.delegationTokenMaxLifeMs)
> //   maxLifeTimestamp = currentTimeMillis + maxLifeTime
> //   expiryTimestamp = Math.min(maxLifeTimestamp, currentTimeMillis + 
> brokerConfig.delegationTokenExpiryTimeMs)
> //   update tokenInfo and return createTokenResult {code}
> 2. renewDelegationToken
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the authenticated user is not owner/renewer of the token, throw 
> DelegationTokenOwnerMismatchException
> // 4. if the delegation token is expired, throw 
> DelegationTokenExpiredException
> // 5. if the delegation token is not found on server, throw 
> DelegationTokenNotFoundException
> // 6. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //renewLifeTime = `renewTimePeriodMs` < 0 ? 
> brokerConfig.delegationTokenExpiryTimeMs : `renewTimePeriodMs`
> //renewTimestamp = currentTimeMillis + renewLifeTime
> //expiryTimestamp = Math.min(tokenInfo.maxTimestamp, renewTimestamp)
> //update tokenInfo.expiryTimestamp
> //return expiryTimestamp {code}
> 3. expireDelegationToken
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the authenticated user is not owner/renewer of the token, throw 
> DelegationTokenOwnerMismatchException
> // 4. if the delegation token is expired, throw 
> DelegationTokenExpiredException
> // 5. if the delegation token is not found on server, throw 
> DelegationTokenNotFoundException
> // 6. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //if `expiryTimePeriodMs` < 0, delete tokenInfo immediately, return 
> currentTimeMillis.
> //otherwise update tokenInfo expiryTimestamp:
> //  expiryTimestamp = Math.min(tokenInfo.maxTimestamp, 
> currentTimeMillis + `expiryTimePeriodMs`)
> //  update tokenInfo.expiryTimestamp
> //  return expiryTimestamp
> //
> //Note: Tokens can be cancelled explicitly. If a token is not renewed by 
> the token’s expiration time or if token is
> //beyond the max life time, it will also be deleted from all broker 
> caches as well as from zookeeper. {code}
> 4. describeDelegationToken
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //if `owners` is EmptyList(note: 

[jira] [Commented] (KAFKA-13751) On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms

2022-03-30 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515032#comment-17515032
 ] 

RivenSun commented on KAFKA-13751:
--

Hi [~dajac]  [~ijuma] 
Could you give some suggestions for this issue?
Thanks.

> On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
> 
>
> Key: KAFKA-13751
> URL: https://issues.apache.org/jira/browse/KAFKA-13751
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 3.0.1
>Reporter: RivenSun
>Priority: Critical
>
> h1. Phenomenon:
>  SASL/OAUTHBEARER, whether implemented by default or customized by the user, 
> is not compatible with other SASL mechanisms.
> h3.  
> case1:
> kafka_server_jaas_oauth.conf
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="admin"
>   user_admin="admin"
>   user_alice="alice"; 
>org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin"
>   password="admin_scram";
> }; {code}
>  server.properties
> {code:java}
> advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
>  
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
> Error when starting kafka:
> server.log
> {code:java}
> [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
>         at kafka.network.Processor.(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
> JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
>         ... 17 more
> [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket 
> server request processors (kafka.network.SocketServer) {code}
> The default implementation class of oauthbearer's 
> `sasl.server.callback.handler.class` is 
> OAuthBearerUnsecuredValidatorCallbackHandler. 
> In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, 
> the jaasConfigEntries parameter is verified.
> What I want to say is that {*}the verification logic here is completely 
> reasonable{*}, but the jaasConfigEntries passed in from the upper layer 
> should not contain the AppConfigurationEntry of other loginModules. There are 
> several other codes for the check of the same keyword *"Must supply exactly 1 
> non-null JAAS mechanism configuration".*
> Rootcause elaborates later.
> By the way, at present, KafkaServer allows {*}the same LoginModule to be 
> configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
> the phenomenon of case1.
> kafka_server_jaas_oauth.conf eg:
> 

[GitHub] [kafka] showuon commented on pull request #11959: MINOR: log warning when topology override for cache size is non-zero

2022-03-30 Thread GitBox


showuon commented on pull request #11959:
URL: https://github.com/apache/kafka/pull/11959#issuecomment-1084012434


   Thanks for fixing the tests, @ableegoldman !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets

2022-03-30 Thread GitBox


showuon edited a comment on pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#issuecomment-1084010770


   Cherry-pick back to 3.2. Thanks.
   cc @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets

2022-03-30 Thread GitBox


showuon commented on pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#issuecomment-1084010770


   Cherry-pick back to 3.2. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets

2022-03-30 Thread GitBox


showuon merged pull request #11936:
URL: https://github.com/apache/kafka/pull/11936


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets

2022-03-30 Thread GitBox


showuon commented on pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#issuecomment-1084009325


   All tests passed. Merging it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11968: add toString method

2022-03-30 Thread GitBox


showuon commented on pull request #11968:
URL: https://github.com/apache/kafka/pull/11968#issuecomment-1084007839


   Thanks for @dajac 's reminder. Yes, this was already fixed in the latest 
client. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yun-yun commented on pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking

2022-03-30 Thread GitBox


yun-yun commented on pull request #11963:
URL: https://github.com/apache/kafka/pull/11963#issuecomment-1083992053


   > @yun-yun , thanks for the contribution!
   
   @showuon It's my pleasure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11972: MINOR: Fix doc variable typos in `TopicConfig`

2022-03-30 Thread GitBox


showuon commented on pull request #11972:
URL: https://github.com/apache/kafka/pull/11972#issuecomment-1083981413


   @fxbing , thanks for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #11972: MINOR: Fix doc variable typos in `TopicConfig`

2022-03-30 Thread GitBox


showuon merged pull request #11972:
URL: https://github.com/apache/kafka/pull/11972


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13777) Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking

2022-03-30 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13777.
---
Resolution: Fixed

> Fix FetchResponse#responseData: Assignment of lazy-initialized members should 
> be the last step with double-checked locking
> --
>
> Key: KAFKA-13777
> URL: https://issues.apache.org/jira/browse/KAFKA-13777
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: YunKui Lu
>Priority: Minor
> Fix For: 3.3.0
>
>
> Assignment of lazy-initialized members should be the last step with 
> double-checked locking.
> now:
>  
> {code:java}
>     public LinkedHashMap 
> responseData(Map topicNames, short version) {
>         if (responseData == null) {
>             synchronized (this) {
>                 if (responseData == null) {
>                     responseData = new LinkedHashMap<>();
>                     data.responses().forEach(topicResponse -> {
>                         String name;
>                         if (version < 13) {
>                             name = topicResponse.topic();
>                         } else {
>                             name = topicNames.get(topicResponse.topicId());
>                         }
>                         if (name != null) {
>                             topicResponse.partitions().forEach(partition ->
>                                 responseData.put(new TopicPartition(name, 
> partition.partitionIndex()), partition));
>                         }
>                     });
>                 }
>             }
>         }
>         return responseData;
>     } {code}
> maybe should:
>  
>  
> {code:java}
>     public LinkedHashMap 
> responseData(Map topicNames, short version) {
>         if (responseData == null) {
>             synchronized (this) {
>                 if (responseData == null) {
> // *** change 1 ***
>                     final LinkedHashMap FetchResponseData.PartitionData> responseDataTmp = new LinkedHashMap<>();
>                     data.responses().forEach(topicResponse -> {
>                         String name;
>                         if (version < 13) {
>                             name = topicResponse.topic();
>                         } else {
>                             name = topicNames.get(topicResponse.topicId());
>                         }
>                         if (name != null) {
>                             topicResponse.partitions().forEach(partition ->
> // *** change 2 ***
>                                     responseDataTmp.put(new 
> TopicPartition(name, partition.partitionIndex()), partition));
>                         }
>                     });
> // *** change 3 ***
>                     responseData = responseDataTmp;
>                 }
>             }
>         }
>         return responseData;
>     } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13777) Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking

2022-03-30 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13777:
--
Fix Version/s: 3.3.0

> Fix FetchResponse#responseData: Assignment of lazy-initialized members should 
> be the last step with double-checked locking
> --
>
> Key: KAFKA-13777
> URL: https://issues.apache.org/jira/browse/KAFKA-13777
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: YunKui Lu
>Priority: Minor
> Fix For: 3.3.0
>
>
> Assignment of lazy-initialized members should be the last step with 
> double-checked locking.
> now:
>  
> {code:java}
>     public LinkedHashMap 
> responseData(Map topicNames, short version) {
>         if (responseData == null) {
>             synchronized (this) {
>                 if (responseData == null) {
>                     responseData = new LinkedHashMap<>();
>                     data.responses().forEach(topicResponse -> {
>                         String name;
>                         if (version < 13) {
>                             name = topicResponse.topic();
>                         } else {
>                             name = topicNames.get(topicResponse.topicId());
>                         }
>                         if (name != null) {
>                             topicResponse.partitions().forEach(partition ->
>                                 responseData.put(new TopicPartition(name, 
> partition.partitionIndex()), partition));
>                         }
>                     });
>                 }
>             }
>         }
>         return responseData;
>     } {code}
> maybe should:
>  
>  
> {code:java}
>     public LinkedHashMap 
> responseData(Map topicNames, short version) {
>         if (responseData == null) {
>             synchronized (this) {
>                 if (responseData == null) {
> // *** change 1 ***
>                     final LinkedHashMap FetchResponseData.PartitionData> responseDataTmp = new LinkedHashMap<>();
>                     data.responses().forEach(topicResponse -> {
>                         String name;
>                         if (version < 13) {
>                             name = topicResponse.topic();
>                         } else {
>                             name = topicNames.get(topicResponse.topicId());
>                         }
>                         if (name != null) {
>                             topicResponse.partitions().forEach(partition ->
> // *** change 2 ***
>                                     responseDataTmp.put(new 
> TopicPartition(name, partition.partitionIndex()), partition));
>                         }
>                     });
> // *** change 3 ***
>                     responseData = responseDataTmp;
>                 }
>             }
>         }
>         return responseData;
>     } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon merged pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking

2022-03-30 Thread GitBox


showuon merged pull request #11963:
URL: https://github.com/apache/kafka/pull/11963


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking

2022-03-30 Thread GitBox


showuon commented on pull request #11963:
URL: https://github.com/apache/kafka/pull/11963#issuecomment-1083965192


   @yun-yun , thanks for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11963: KAFKA-13777: Fix FetchResponse#responseData: Assignment of lazy-initialized members should be the last step with double-checked locking

2022-03-30 Thread GitBox


showuon commented on pull request #11963:
URL: https://github.com/apache/kafka/pull/11963#issuecomment-1083964754


   Failed tests are unrelated and also failed in trunk build.
   ```
   Build / PowerPC / 
org.apache.kafka.clients.consumer.KafkaConsumerTest.testReturnRecordsDuringRebalance()
   Build / JDK 11 and Scala 2.13 / 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bozhao12 commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

2022-03-30 Thread GitBox


bozhao12 commented on pull request #11965:
URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083910260


   @dajac @ijuma One new unit test updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest

2022-03-30 Thread GitBox


jeffkbkim commented on a change in pull request #11971:
URL: https://github.com/apache/kafka/pull/11971#discussion_r839055468



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2819,6 +2851,39 @@ public void 
testEnforceRebalanceTriggersRebalanceOnNextPoll() {
 assertEquals(countingRebalanceListener.revokedCount, 1);
 }
 
+@Test
+public void testEnforceRebalanceReason() {
+Time time = new MockTime(1L);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
+MockRebalanceListener countingRebalanceListener = new 
MockRebalanceListener();
+initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), 
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
+
+consumer.subscribe(Arrays.asList(topic, topic2), 
countingRebalanceListener);
+Node node = metadata.fetch().nodes().get(0);
+prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), 
null);
+
+// a first rebalance to get the assignment, we need two poll calls 
since we need two round trips to finish join / sync-group
+consumer.poll(Duration.ZERO);
+consumer.poll(Duration.ZERO);

Review comment:
   the test passes without the second poll. the first poll finishes the sync
   ```
   INFO Successfully synced group in generation
   ```
   before the second poll is triggered.
   The second poll notifies the assignor and gets committed offsets which i 
don't think is necessary in this test

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) {
 
 List membersToRemove = new ArrayList<>();
 for (final MemberDescription member : members) {
+MemberIdentity memberIdentity = new MemberIdentity()
+.setReason(reason);
+
 if (member.groupInstanceId().isPresent()) {
-membersToRemove.add(new 
MemberIdentity().setGroupInstanceId(member.groupInstanceId().get()));
+
memberIdentity.setGroupInstanceId(member.groupInstanceId().get());
 } else {
-membersToRemove.add(new 
MemberIdentity().setMemberId(member.consumerId()));
+memberIdentity.setMemberId(member.consumerId());
 }
+
+membersToRemove.add(memberIdentity);
 }
 return membersToRemove;
 }
 
 @Override
 public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(String groupId,

RemoveMembersFromConsumerGroupOptions options) {
+String reason = options.reason() == null || options.reason().isEmpty() 
?
+DEFAULT_LEAVE_GROUP_REASON : options.reason();
+
 List members;
 if (options.removeAll()) {
-members = getMembersFromGroup(groupId);
+members = getMembersFromGroup(groupId, reason);
 } else {
-members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+members = options.members().stream()
+.map(m -> m.toMemberIdentity().setReason(reason))
+.collect(Collectors.toList());

Review comment:
   should this have been done as part of KIP-800?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) {
 
 List membersToRemove = new ArrayList<>();
 for (final MemberDescription member : members) {
+MemberIdentity memberIdentity = new MemberIdentity()
+.setReason(reason);

Review comment:
   nit: does `.setReason()` have to be in its own line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11959: MINOR: log warning when topology override for cache size is non-zero

2022-03-30 Thread GitBox


ableegoldman commented on pull request #11959:
URL: https://github.com/apache/kafka/pull/11959#issuecomment-1083757310


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11959: MINOR: log warning when topology override for cache size is non-zero

2022-03-30 Thread GitBox


ableegoldman merged pull request #11959:
URL: https://github.com/apache/kafka/pull/11959


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-30 Thread GitBox


kkonstantine commented on pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#issuecomment-1083753832


   Thanks both. Merged to `trunk` and cherry-picked to `3.2, 3.1, 3.0`. cc 
@cadonna @tombentley 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-03-30 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13152.

Resolution: Fixed

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman commented on pull request #11959: MINOR: log warning when topology override for cache size is non-zero

2022-03-30 Thread GitBox


ableegoldman commented on pull request #11959:
URL: https://github.com/apache/kafka/pull/11959#issuecomment-1083746981


   All test failures are in Connect, so unrelated. Going to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11959: MINOR: log warning when topology override for cache size is non-zero

2022-03-30 Thread GitBox


ableegoldman commented on a change in pull request #11959:
URL: https://github.com/apache/kafka/pull/11959#discussion_r839039646



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -466,64 +468,9 @@ public void close() {
 waitForTransitionFromRebalancingToRunning();
 
 for (final String log : appender.getMessages()) {
-// after we replace the thread there should be two 
remaining threads with 5 bytes each
-if (log.endsWith("Adding StreamThread-3, there are now 3 
threads with cache size/max buffer size values as 3/178956970 per thread.")) {

Review comment:
   No actually I meant that the comment was correct -- the test was just 
verifying incorrect results (after the thread replacement there should be 2 
threads with 5MB of cache, as it says). But no worries 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs

2022-03-30 Thread GitBox


cmccabe commented on a change in pull request #11941:
URL: https://github.com/apache/kafka/pull/11941#discussion_r839038060



##
File path: metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public class ConfigSynonym {
+private static final Logger log = 
LoggerFactory.getLogger(ConfigSynonym.class);
+
+public static final Function IDENTITY = a -> a;
+
+public static final Function HOURS_TO_MILLISECONDS = input 
-> {
+int hours = valueToInt(input, 0, "hoursToMilliseconds");
+return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, 
TimeUnit.HOURS));
+};
+
+public static final Function MINUTES_TO_MILLISECONDS = 
input -> {
+int hours = valueToInt(input, 0, "minutesToMilliseconds");
+return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, 
TimeUnit.MINUTES));
+};
+
+private static int valueToInt(String input, int defaultValue, String what) 
{
+String trimmedInput = input.trim();
+if (trimmedInput.isEmpty()) {
+return defaultValue;
+}
+try {
+return Integer.parseInt(trimmedInput);
+} catch (Exception e) {
+log.error("{} failed: unable to parse '{}' as an integer.", what, 
trimmedInput, e);
+return defaultValue;
+}
+}
+
+private final String name;
+private final Function converter;
+
+public ConfigSynonym(String name, Function converter) {
+this.name = name;
+this.converter = converter;
+}
+
+public ConfigSynonym(String name) {
+this(name, IDENTITY);
+}
+
+public String name() {
+return name;
+}
+
+public Function converter() {
+return converter;
+}
+
+public String convert(String input) {

Review comment:
   no, let's remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs

2022-03-30 Thread GitBox


cmccabe commented on a change in pull request #11941:
URL: https://github.com/apache/kafka/pull/11941#discussion_r839036960



##
File path: metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public class ConfigSynonym {
+private static final Logger log = 
LoggerFactory.getLogger(ConfigSynonym.class);
+
+public static final Function IDENTITY = a -> a;
+
+public static final Function HOURS_TO_MILLISECONDS = input 
-> {
+int hours = valueToInt(input, 0, "hoursToMilliseconds");
+return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, 
TimeUnit.HOURS));
+};
+
+public static final Function MINUTES_TO_MILLISECONDS = 
input -> {
+int hours = valueToInt(input, 0, "minutesToMilliseconds");
+return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, 
TimeUnit.MINUTES));
+};
+
+private static int valueToInt(String input, int defaultValue, String what) 
{
+String trimmedInput = input.trim();

Review comment:
   I don't *think* it can, since these configs are of `ConfigDef.Type` 
`INT`, `LONG`, etc. and config parsing would have rejected the `null`... uh, I 
think.
   
   In any case, I just added a case for that so that we return the default 
value, just to be safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs

2022-03-30 Thread GitBox


cmccabe commented on a change in pull request #11941:
URL: https://github.com/apache/kafka/pull/11941#discussion_r839036960



##
File path: metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public class ConfigSynonym {
+private static final Logger log = 
LoggerFactory.getLogger(ConfigSynonym.class);
+
+public static final Function IDENTITY = a -> a;
+
+public static final Function HOURS_TO_MILLISECONDS = input 
-> {
+int hours = valueToInt(input, 0, "hoursToMilliseconds");
+return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, 
TimeUnit.HOURS));
+};
+
+public static final Function MINUTES_TO_MILLISECONDS = 
input -> {
+int hours = valueToInt(input, 0, "minutesToMilliseconds");
+return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, 
TimeUnit.MINUTES));
+};
+
+private static int valueToInt(String input, int defaultValue, String what) 
{
+String trimmedInput = input.trim();

Review comment:
   I added a case for that so that we return the default value




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs

2022-03-30 Thread GitBox


cmccabe commented on a change in pull request #11941:
URL: https://github.com/apache/kafka/pull/11941#discussion_r839036276



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -51,27 +52,100 @@
 
 
 public class ConfigurationControlManager {
+public static final ConfigResource DEFAULT_NODE = new 
ConfigResource(Type.BROKER, "");
+
 private final Logger log;
 private final SnapshotRegistry snapshotRegistry;
 private final KafkaConfigSchema configSchema;
 private final Consumer existenceChecker;
 private final Optional alterConfigPolicy;
 private final ConfigurationValidator validator;
 private final TimelineHashMap> configData;
+private final Map staticConfig;
+private final ConfigResource currentNode;

Review comment:
   I think "this" is more confusing than "node" :)
   
   I changed it to `currentController`, take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs

2022-03-30 Thread GitBox


cmccabe commented on a change in pull request #11941:
URL: https://github.com/apache/kafka/pull/11941#discussion_r839035589



##
File path: 
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
##
@@ -189,6 +192,15 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
   override def modifyConfigs(configs: Seq[Properties]): Unit = {
 super.modifyConfigs(configs)
+// For testCreateTopicsReturnsConfigs, set some static broker 
configurations so that we can
+// verify that they show up in the "configs" output of CreateTopics.

Review comment:
   Yes, this is kind of a problem with the old test harness. I agree that 
the non-locality could be confusing here. However, I think it would be a lot of 
repeated boilerplate code to create a new test file just for this one test case.
   
   As a compromise, I added some JavaDoc to `testCreateTopicsReturnsConfigs` so 
that anyone modifying that test can see where the configurations are being 
set




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs

2022-03-30 Thread GitBox


cmccabe commented on a change in pull request #11941:
URL: https://github.com/apache/kafka/pull/11941#discussion_r839033673



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -462,7 +475,9 @@ public void replay(RemoveTopicRecord record) {
 
 private ApiError createTopic(CreatableTopic topic,
  List records,
- Map successes) {
+ Map successes,
+ boolean includeConfigs) {

Review comment:
   Changed to `authorizedToReturnConfigs`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink

2022-03-30 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514971#comment-17514971
 ] 

Chris Egerton commented on KAFKA-13601:
---

[~dasarianil] does the above make sense to you? If so, do you still want to 
pursue synchronous offset commits for sink tasks?

> Add option to support sync offset commit in Kafka Connect Sink
> --
>
> Key: KAFKA-13601
> URL: https://issues.apache.org/jira/browse/KAFKA-13601
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Anil Dasari
>Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner 
> can be achieved with consumer offset sync' commit after message batch flushed 
> to sink successfully
> Currently, WorkerSinkTask committing the consumer offsets asynchronously and 
> at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG 
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354]
>  
> Add config to allow user to select synchronous commit over 
> WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2022-03-30 Thread GitBox


C0urante commented on pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#issuecomment-1083666779


   @showuon FWIW, I've got a refactoring PR up for the testing logic that we 
might take a look at in the meantime: https://github.com/apache/kafka/pull/11974
   
   Hopefully this should make writing and reviewing tests for changes like in 
this PR easier in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-03-30 Thread GitBox


C0urante commented on pull request #11974:
URL: https://github.com/apache/kafka/pull/11974#issuecomment-1083666008


   There's still room for improvement with the incremental rebalancing testing 
logic, but many of the remaining changes would involve modifications to the 
`IncrementalCooperativeAssignor` class as well. In order to reduce PR size and 
keep things focused, I've decided to leave that to a follow-up pull request, 
but since it would accomplish basically the same goal, I'll attach it to 
KAFKA-13763 as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-03-30 Thread GitBox


C0urante opened a new pull request #11974:
URL: https://github.com/apache/kafka/pull/11974


   [Jira](https://issues.apache.org/jira/browse/KAFKA-13763)
   
   This is strictly a testing refactor. No functional changes are made; this 
can be easily verified by confirming that the only affected file is the 
`IncrementalCooperativeAssignorTest.java` test suite.
   
   These changes were initially discussed during review of 
https://github.com/apache/kafka/pull/10367, which partially focused on 
improving readability in the unit tests for incremental rebalancing in Connect.
   
   The goals here include:
   1. Simplify the logic that has to be manually specified on a per-test-case 
basis for simulating a rebalance (accomplished by extracting common logic into 
reusable utility methods such as `performRebalance` and `addNewEmptyWorkers`)
   2. Reduce the cognitive burden for following testing logic by removing 
unnecessary fields (like `expectedMemberConfigs` and `assignments`) and 
assertions (like the redundant checks for leader and leader URL)
   3. Add powerful, granular, and reusable utility methods that can provide 
stronger guarantees about the state of a cluster across successive rebalances 
without forcing people to track this state in their heads (accomplished by 
replacing the existing `assertAssignments` method with `assertWorkers`, 
`assertEmptyAssignment`, `assertConnectorAllocations`, and 
`assertTaskAllocations`, and by adding the new `assertBalancedAllocation` and 
`assertCompleteAllocation` methods)
   4. Refactor common logic for testing utilities to be more concise and reduce 
the number of Java 8 streams statements that have to be understood in order to 
read through a test case
   5. Fix a bug in the assertion logic for checking for duplicates currently 
present 
[here](https://github.com/apache/kafka/blob/b2cb6caa1e9267c720c00fa367a277ee8509baea/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1447-L1451)
 and 
[here](https://github.com/apache/kafka/blob/b2cb6caa1e9267c720c00fa367a277ee8509baea/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1456-L1460)
 
([`List::removeAll`](https://docs.oracle.com/javase/8/docs/api/java/util/List.html#removeAll-java.util.Collection-)
 removes _all_ occurrences of any element contained in the collection passed to 
the method)
   6. Remove an incorrect assertion in the `assertNoReassignments` (now renamed 
to `assertNoRedundantAssignments`) method that there should be no duplicated 
connectors or tasks in the assignments reported by each worker to the leader 
during rebalance (this is unnecessary and even contradicts logic used for 
testing in cases like 
[this](https://github.com/apache/kafka/blob/b2cb6caa1e9267c720c00fa367a277ee8509baea/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1112-L1125)
 where we intentionally simulate a worker with a duplicated set of connectors 
and/or tasks rejoining a cluster; the only reason this bug wasn't surfaced 
sooner is because the bug mentioned in the prior point covers it)
   
   Once merged, this should allow for cleaner, faster test writing when adding 
new cases for incremental rebalancing, such as with 
https://github.com/apache/kafka/pull/10367.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13763) Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor

2022-03-30 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13763:
--
Summary: Improve unit testing coverage and flexibility for 
IncrementalCooperativeAssignor  (was: Improve unit testing coverage for 
IncrementalCooperativeAssignor)

> Improve unit testing coverage and flexibility for 
> IncrementalCooperativeAssignor
> 
>
> Key: KAFKA-13763
> URL: https://issues.apache.org/jira/browse/KAFKA-13763
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
>
> The 
> [tests|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java]
>  for the {{IncrementalCooperativeAssignor}} class provide a moderate level of 
> coverage and cover some non-trivial cases, but there are some areas for 
> improvement that will allow us to iterate on the assignment logic for Kafka 
> Connect faster and with greater confidence.
> These improvements include:
>  * Adding reusable utility methods to assert that a cluster's assignment is 
> *balanced* (the difference in the number of connectors and tasks assigned to 
> any two workers is at most one) and *complete* (all connectors and tasks are 
> assigned to a worker)
>  * Removing the existing 
> [assertAssignment|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1373-L1405]
>  methods and replacing them with a more fine-grained alternative that allows 
> for more granular assertions about the number of tasks/connectors 
> assigned/revoked from each worker during a round of rebalance, instead of the 
> total for the entire cluster
>  * Adding a reusable utility method to assert the current distribution of 
> connectors and tasks across the cluster
>  * Decomposing large portions of repeated code for simulating a round of 
> rebalancing into a reusable utility method
>  * Renaming variable names to improve accuracy/readability (the 
> {{expectedMemberConfigs}} field, for example, is pretty poorly named)
> But other improvements may be added in a pull request that addresses the 
> above as they come up.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13784) DescribeQuorum should return the current leader if the handling node is not the current leader

2022-03-30 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13784:
--

 Summary: DescribeQuorum should return the current leader if the 
handling node is not the current leader
 Key: KAFKA-13784
 URL: https://issues.apache.org/jira/browse/KAFKA-13784
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.2.0
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


For clients calling DescribeQuorum leader it is not possible for them to 
discover the current leader. If the request is sent to a node that is not the 
leader is simply replies with INVALID_REQUEST. KIP-595 mentions that it should 
instead reply with the current leader.

 

> f the response indicates that the intended node is not the current leader, 
>then check the response to see if the {{LeaderId}} has been set. If so, then 
>attempt to retry the request with the new leader.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji merged pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common

2022-03-30 Thread GitBox


hachikuji merged pull request #11970:
URL: https://github.com/apache/kafka/pull/11970


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13748) Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-30 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-13748.

Resolution: Fixed

cc [~cadonna] [~tombentley] re: inclusion to the upcoming releases. 

> Do not include file stream connectors in Connect's CLASSPATH and plugin.path 
> by default
> ---
>
> Key: KAFKA-13748
> URL: https://issues.apache.org/jira/browse/KAFKA-13748
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.2.0, 3.1.1, 3.0.2
>
>
> File stream connectors have been included with Kafka Connect distributions 
> from the very beginning. These simple connectors were included to show case 
> connector implementation but were never meant to be used in production and 
> have been only available for the straightforward demonstration of Connect's 
> capabilities through our quick start guides. 
>  
>  Given that these connectors are not production ready and yet they offer 
> access to the local filesystem, with this ticket I propose to remove them 
> from our deployments by default by excluding these connectors from the 
> {{CLASSPATH}} or the default {{{}plugin.path{}}}. 
>  
>  The impact will be minimal. Quick start guides will require a single 
> additional step of editing the {{plugin.path}} to include the single package 
> that includes these connectors. Production deployments will remain unaffected 
> because these are not production grade connectors. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13759) Disable producer idempotence by default in producers instantiated by Connect

2022-03-30 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17511516#comment-17511516
 ] 

Konstantine Karantasis edited comment on KAFKA-13759 at 3/30/22, 8:38 PM:
--

This issue has been now been merged on the 3.2 and 3.1 branches to avoid a 
breaking change when Connect contacts older brokers and idempotence is enabled 
in the producer by default. 
[~cadonna] [~tombentley] fyi. 
Hopefully this fix makes to the upcoming releases but please let me know if the 
targeted versions need to be adjusted. 


was (Author: kkonstantine):
This issue has been now been merged on the 3.2 and 3.1 branches to avoid a 
breaking change when Connect. 
[~cadonna] [~tombentley] fyi. 
Hopefully this fix makes to the upcoming releases but please let me know if the 
targeted versions need to be adjusted. 

> Disable producer idempotence by default in producers instantiated by Connect
> 
>
> Key: KAFKA-13759
> URL: https://issues.apache.org/jira/browse/KAFKA-13759
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.2.0, 3.1.1, 3.0.2
>
>
> https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently 
> referring to KIP-318. Before that in AK 3.0 idempotence was enabled by 
> default across Kafka producers. 
> However, some compatibility implications were missed in both cases. 
> If idempotence is enabled by default Connect won't be able to communicate via 
> its producers with Kafka brokers older than version 0.11. Perhaps more 
> importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL 
> is required to be granted to the principal of the Connect worker. 
> Given the above caveats, this ticket proposes to explicitly disable producer 
> idempotence in Connect by default. This feature, as it happens today, can be 
> enabled by setting worker and/or connector properties. However, enabling it 
> by default should be considered in a major version upgrade and after KIP-318 
> is updated to mention the compatibility requirements and gets officially 
> approved. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] kkonstantine merged pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-30 Thread GitBox


kkonstantine merged pull request #11908:
URL: https://github.com/apache/kafka/pull/11908


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13775:
--
Fix Version/s: 3.1.1

> CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
> -
>
> Key: KAFKA-13775
> URL: https://issues.apache.org/jira/browse/KAFKA-13775
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Edwin Hobor
>Assignee: Edwin Hobor
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.2.0, 3.1.1
>
>
> *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see 
> [https://github.com/advisories/GHSA-57j2-w4cx-62h2]).
> Upgrading to jackson-databind version *2.12.6.1* should address this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread GitBox


cadonna commented on pull request #11962:
URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083560237


   Thank you @edwin092 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread GitBox


cadonna commented on pull request #11962:
URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083560072


   Cherry-picked to 3.1 \cc @tombentley 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-13775:
-

Assignee: Edwin Hobor

> CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
> -
>
> Key: KAFKA-13775
> URL: https://issues.apache.org/jira/browse/KAFKA-13775
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Edwin Hobor
>Assignee: Edwin Hobor
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.2.0
>
>
> *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see 
> [https://github.com/advisories/GHSA-57j2-w4cx-62h2]).
> Upgrading to jackson-databind version *2.12.6.1* should address this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13660) Replace log4j with reload4j

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-13660:
-

Assignee: Mike Lothian

> Replace log4j with reload4j
> ---
>
> Key: KAFKA-13660
> URL: https://issues.apache.org/jira/browse/KAFKA-13660
> Project: Kafka
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Mike Lothian
>Assignee: Mike Lothian
>Priority: Major
>
> Kafka is using a known vulnerable version of log4j, the reload4j project was 
> created by the code's original authors to address those issues. It is 
> designed as a drop in replacement without any api changes
>  
> https://reload4j.qos.ch/
>  
> I've raised a merge request, replacing log4j with reload4j, slf4j-log4j12 
> with slf4j-reload4j and bumping the slf4j version
>  
> This is my first time contributing to the Kafka project and I'm not too 
> familiar with the process, I'll go back and amend my PR with this issue number



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13660) Replace log4j with reload4j

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13660.
---
Resolution: Fixed

> Replace log4j with reload4j
> ---
>
> Key: KAFKA-13660
> URL: https://issues.apache.org/jira/browse/KAFKA-13660
> Project: Kafka
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Mike Lothian
>Assignee: Mike Lothian
>Priority: Major
> Fix For: 3.2.0, 3.1.1
>
>
> Kafka is using a known vulnerable version of log4j, the reload4j project was 
> created by the code's original authors to address those issues. It is 
> designed as a drop in replacement without any api changes
>  
> https://reload4j.qos.ch/
>  
> I've raised a merge request, replacing log4j with reload4j, slf4j-log4j12 
> with slf4j-reload4j and bumping the slf4j version
>  
> This is my first time contributing to the Kafka project and I'm not too 
> familiar with the process, I'll go back and amend my PR with this issue number



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13660) Replace log4j with reload4j

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13660:
--
Fix Version/s: 3.2.0
   3.1.1

> Replace log4j with reload4j
> ---
>
> Key: KAFKA-13660
> URL: https://issues.apache.org/jira/browse/KAFKA-13660
> Project: Kafka
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Mike Lothian
>Assignee: Mike Lothian
>Priority: Major
> Fix For: 3.2.0, 3.1.1
>
>
> Kafka is using a known vulnerable version of log4j, the reload4j project was 
> created by the code's original authors to address those issues. It is 
> designed as a drop in replacement without any api changes
>  
> https://reload4j.qos.ch/
>  
> I've raised a merge request, replacing log4j with reload4j, slf4j-log4j12 
> with slf4j-reload4j and bumping the slf4j version
>  
> This is my first time contributing to the Kafka project and I'm not too 
> familiar with the process, I'll go back and amend my PR with this issue number



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cadonna commented on pull request #11743: KAFKA-13660: Switch log4j12 to reload4j

2022-03-30 Thread GitBox


cadonna commented on pull request #11743:
URL: https://github.com/apache/kafka/pull/11743#issuecomment-1083550804






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest

2022-03-30 Thread GitBox


dajac commented on a change in pull request #11971:
URL: https://github.com/apache/kafka/pull/11971#discussion_r838900451



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) {
 
 List membersToRemove = new ArrayList<>();
 for (final MemberDescription member : members) {
+MemberIdentity memberIdentity = new MemberIdentity()
+.setReason(reason);

Review comment:
   This does not seem to be an issue. I have extended the unit test to 
ensure that the look up works as we expect.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-30 Thread Tm Alkemade (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514910#comment-17514910
 ] 

Tm Alkemade commented on KAFKA-13773:
-

[~junrao] Yes, thats true, it was a new different test run, (it had the same 
behavior though). I'll try to get a consistent run with logs in the right order 
tomorrow, unfortunately getting all the logs is a bit tricky sometimes in 
Kubernetes when pods are restarting.

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets

2022-03-30 Thread GitBox


dajac commented on pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#issuecomment-1083525133


   I will merge it tomorrow if no one object. This must be cherry-picked to 3.2 
as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11743: KAFKA-13660: Switch log4j12 to reload4j

2022-03-30 Thread GitBox


cadonna merged pull request #11743:
URL: https://github.com/apache/kafka/pull/11743


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11743: KAFKA-13660: Switch log4j12 to reload4j

2022-03-30 Thread GitBox


cadonna commented on pull request #11743:
URL: https://github.com/apache/kafka/pull/11743#issuecomment-1083501431


   Failures are unrelated:
   
   ```
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testStartTwoConnectors
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.TransformationIntegrationTest.testFilterOnTopicNameWithSinkConnector
   Build / JDK 8 and Scala 2.12 / kafka.admin.LeaderElectionCommandTest.[1] 
Type=Raft, Name=testElectionResultOutput, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-30 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514888#comment-17514888
 ] 

Jun Rao commented on KAFKA-13773:
-

[~Timelad] : I checked kafka-0-2.8.0-before-fail.log 
kafka-1-2.8.0-before-fail.log in kafka-2.8.0-crash.zip. They both seem to have 
timestamp after 13:23:00,077, which is the time when the recovery is skipped. 

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest

2022-03-30 Thread GitBox


dajac commented on a change in pull request #11971:
URL: https://github.com/apache/kafka/pull/11971#discussion_r838861385



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) {
 
 List membersToRemove = new ArrayList<>();
 for (final MemberDescription member : members) {
+MemberIdentity memberIdentity = new MemberIdentity()
+.setReason(reason);

Review comment:
   Good question. Let me double check this. For the context, we were doing 
this before this patch as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13775.
---
Resolution: Fixed

> CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
> -
>
> Key: KAFKA-13775
> URL: https://issues.apache.org/jira/browse/KAFKA-13775
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Edwin Hobor
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.2.0
>
>
> *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see 
> [https://github.com/advisories/GHSA-57j2-w4cx-62h2]).
> Upgrading to jackson-databind version *2.12.6.1* should address this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13775:
--
Fix Version/s: 3.2.0

> CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
> -
>
> Key: KAFKA-13775
> URL: https://issues.apache.org/jira/browse/KAFKA-13775
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Edwin Hobor
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.2.0
>
>
> *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see 
> [https://github.com/advisories/GHSA-57j2-w4cx-62h2]).
> Upgrading to jackson-databind version *2.12.6.1* should address this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread GitBox


cadonna commented on pull request #11962:
URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083490149


   Cherry-picked to 3.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-30 Thread Tm Alkemade (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tm Alkemade updated KAFKA-13773:

Affects Version/s: 2.8.0

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cadonna merged pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread GitBox


cadonna merged pull request #11962:
URL: https://github.com/apache/kafka/pull/11962


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11962: KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread GitBox


cadonna commented on pull request #11962:
URL: https://github.com/apache/kafka/pull/11962#issuecomment-1083482813


   Failures are unrelated
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-30 Thread Tm Alkemade (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514880#comment-17514880
 ] 

Tm Alkemade commented on KAFKA-13773:
-

{quote}'but it's not clear if the previous shutdown was clean or not'
{quote}
The previous shutdown should not have been clean, since there wasn't enough 
disk space, I've attached the log for the time of the crash in 
kafka-2.8.0-crash.zip. I'll try to see if I can get the log for the first 
restart after the first disk space crash tomorrow, it might give some more 
insight.
{quote}After that, if the broker is restarted again, it should go through log 
recovery, did that happen?
{quote}
Kafka skipped log recovery once it had enough disk space. See 
kafka-2.7.0vs2.8.0.zip, it contains the log4j logs of the first start after 
resizing the disk.

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12511) Flaky test DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()

2022-03-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514875#comment-17514875
 ] 

Bruno Cadonna commented on KAFKA-12511:
---

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11962/4/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicListenerConnectionCreationRateQuota___2/

{code}
java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
Listener EXTERNAL connection rate 4.8959608323133414 must be below 4.8 ==> 
expected:  but was: 
{code}

> Flaky test 
> DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
> --
>
> Key: KAFKA-12511
> URL: https://issues.apache.org/jira/browse/KAFKA-12511
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Priority: Minor
>
> First time:
> Listener PLAINTEXT connection rate 14.419389476913636 must be below 
> 14.399 ==> expected:  but was: 
> Second time:
> Listener EXTERNAL connection rate 10.998243336133811 must be below 
> 10.799 ==> expected:  but was: 
> details: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10289/4/testReport/junit/kafka.network/DynamicConnectionQuotaTest/Build___JDK_11___testDynamicListenerConnectionCreationRateQuota__/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] lihaosky commented on a change in pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest

2022-03-30 Thread GitBox


lihaosky commented on a change in pull request #11971:
URL: https://github.com/apache/kafka/pull/11971#discussion_r838839458



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3723,27 +3723,34 @@ private Integer nodeFor(ConfigResource resource) {
 
 List membersToRemove = new ArrayList<>();
 for (final MemberDescription member : members) {
+MemberIdentity memberIdentity = new MemberIdentity()
+.setReason(reason);

Review comment:
   QQ: seems `reason` is also used in `equals`, is it ok to add this? Will 
this cause `membersToRemove` not found from somewhere?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11968: add toString method

2022-03-30 Thread GitBox


dajac commented on pull request #11968:
URL: https://github.com/apache/kafka/pull/11968#issuecomment-1083453850


   @chenzhongyu11 Thanks for the PR. I see that the issue is already fixed in 
trunk. Is that correct? 2.5 is pretty old, I don’t think that we will ever 
release a minor release for it. Did you consider upgrading your client?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11936: MINOR: GetOffsetShell should ignore partitions without offsets

2022-03-30 Thread GitBox


dajac commented on pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#issuecomment-1083445963


   @showuon Any further comment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji edited a comment on pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common

2022-03-30 Thread GitBox


hachikuji edited a comment on pull request #11970:
URL: https://github.com/apache/kafka/pull/11970#issuecomment-1083410018


   @dajac Yeah, exactly. That is why I did not want to touch the old functions 
here. To be honest, we might never get rid of them since there's not a ton of 
incentive to do so. Nevertheless, I think it's helpful to have these new 
functions in `server-common` with explicit ordering. It will help with KRaft in 
particular where we need to redefine metrics so that the mbeans have the same 
name as existing metrics. In general, I think we're moving toward a more 
modular server structure, so there will probably be other use cases as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13719) connector restart cause duplicate tasks

2022-03-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-13719:
---
Fix Version/s: 3.2.0

> connector restart cause duplicate tasks
> ---
>
> Key: KAFKA-13719
> URL: https://issues.apache.org/jira/browse/KAFKA-13719
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Shujie Sun
>Assignee: Shujie Sun
>Priority: Critical
> Fix For: 3.2.0, 3.1.1, 3.3.0
>
>
> Restart connector with parameter includeTasks=true=false cause 
> duplicate tasks and duplicate message。



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on pull request #11869: KAFKA-13719: fix connector restart cause duplicate tasks

2022-03-30 Thread GitBox


mimaison commented on pull request #11869:
URL: https://github.com/apache/kafka/pull/11869#issuecomment-1083415753


   Backported to 3.2 and 3.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13719) connector restart cause duplicate tasks

2022-03-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-13719:
---
Fix Version/s: 3.1.1

> connector restart cause duplicate tasks
> ---
>
> Key: KAFKA-13719
> URL: https://issues.apache.org/jira/browse/KAFKA-13719
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Shujie Sun
>Assignee: Shujie Sun
>Priority: Critical
> Fix For: 3.1.1, 3.3.0
>
>
> Restart connector with parameter includeTasks=true=false cause 
> duplicate tasks and duplicate message。



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji edited a comment on pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common

2022-03-30 Thread GitBox


hachikuji edited a comment on pull request #11970:
URL: https://github.com/apache/kafka/pull/11970#issuecomment-1083410018


   @dajac Yeah, exactly. That is why I did not want to touch the old functions 
here. To be honest, we might never get rid of them since there's not a ton of 
incentive to do so. Nevertheless, I think it's helpful to have these new 
functions in `server-common` with explicit ordering. It will help with KRaft in 
particular where we need to redefine metrics so that the mbeans have the same 
name as existing metrics. In general, I think we're moving toward a more 
modular structure, so there will probably be other use cases as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11970: MINOR: Move `KafkaYammerMetrics` to server-common

2022-03-30 Thread GitBox


hachikuji commented on pull request #11970:
URL: https://github.com/apache/kafka/pull/11970#issuecomment-1083410018


   @dajac Yeah, exactly. That is why I did not want to touch the old functions 
here. To be honest, we might never get rid of them since there's not a ton of 
incentive to do so. Nevertheless, I think it's helpful to have these new 
functions in `server-common` with explicit ordering. It will help with KRaft in 
particular where we need to redefine metrics so that the mbeans have the same 
name. In general, I think we're moving toward a more modular structure, so 
there will probably be other use cases as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yufeiyan1220 commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized

2022-03-30 Thread GitBox


yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r838767189



##
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##
@@ -210,4 +217,114 @@ class AbstractFetcherManagerTest {
 verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
 verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: 
Int, brokerNum: Int = 6): Unit = {
+val fetchingTopicPartitions = makeTopicPartition(10, 100)
+val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherSize) {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+new TestResizeFetcherThread(sourceBroker, failedPartitions)
+  }
+}
+try {
+  fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp 
=>
+val brokerId = getBrokerId(tp, brokerNum)
+val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+  }.toMap)
+
+  // Mark some of these partitions failed within resizing scope
+  
fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+  // Mark failed partitions out of resizing scope
+  failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+  fetcherManager.resizeThreadPool(newFetcherSize)
+
+  val ownedPartitions = mutable.Set.empty[TopicPartition]
+  fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+val fetcherId = brokerIdAndFetcherId.fetcherId
+val brokerId = brokerIdAndFetcherId.brokerId
+
+fetcherThread.partitions.foreach { tp =>
+  ownedPartitions += tp
+  assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+  assertEquals(getBrokerId(tp, brokerNum), brokerId)
+}
+  }
+  // Verify that all partitions are owned by the fetcher threads.
+  assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+  val failedPartitionsAfterResize = 
fetcherManager.failedPartitions.failedPartitions()
+  // Verify that failed partitions within resizing scope are removed, 
otherwise retained
+  assertEquals(Set.empty, 
fetchingTopicPartitions.intersect(failedPartitionsAfterResize))
+  assertEquals(failedTopicPartitions, 
failedTopicPartitions.intersect(failedPartitionsAfterResize))

Review comment:
   yeah!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-30 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514808#comment-17514808
 ] 

Jun Rao commented on KAFKA-13773:
-

[~Timelad] : Thanks for the additional logs. I took a quick look at 
kafka-2-2.8.0-before-resize.log. The broker did skip recovery, but it's not 
clear if the previous shutdown was clean or not. Do you have the log before 
that? During loading, the broker shut down abruptly due to the no space issue. 
After that, if the broker is restarted again, it should go through log 
recovery, did that happen?

 
{code:java}
2022-03-30 13:23:00,077 INFO Skipping recovery for all logs in 
/var/lib/kafka/data-0/kafka-log2 since clean shutdown file was found 
(kafka.log.LogManager) [main]
{code}
 

 

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rajinisivaram merged pull request #11973: MINOR: Increase wait in ZooKeeperClientTest

2022-03-30 Thread GitBox


rajinisivaram merged pull request #11973:
URL: https://github.com/apache/kafka/pull/11973


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2022-03-30 Thread GitBox


viktorsomogyi commented on pull request #10738:
URL: https://github.com/apache/kafka/pull/10738#issuecomment-1083329319


   @omkreddy I've addressed your comments. Would you please check the PR again?
   Also I see that some of the configurations fail to build but I don't know 
why is that. Locally they're fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11953: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized

2022-03-30 Thread GitBox


dajac commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r838689224



##
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##
@@ -210,4 +217,114 @@ class AbstractFetcherManagerTest {
 verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
 verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: 
Int, brokerNum: Int = 6): Unit = {
+val fetchingTopicPartitions = makeTopicPartition(10, 100)
+val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherSize) {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+new TestResizeFetcherThread(sourceBroker, failedPartitions)
+  }
+}
+try {
+  fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp 
=>
+val brokerId = getBrokerId(tp, brokerNum)
+val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+  }.toMap)
+
+  // Mark some of these partitions failed within resizing scope
+  
fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+  // Mark failed partitions out of resizing scope
+  failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+  fetcherManager.resizeThreadPool(newFetcherSize)
+
+  val ownedPartitions = mutable.Set.empty[TopicPartition]
+  fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+val fetcherId = brokerIdAndFetcherId.fetcherId
+val brokerId = brokerIdAndFetcherId.brokerId
+
+fetcherThread.partitions.foreach { tp =>
+  ownedPartitions += tp
+  assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+  assertEquals(getBrokerId(tp, brokerNum), brokerId)
+}
+  }
+  // Verify that all partitions are owned by the fetcher threads.
+  assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+  val failedPartitionsAfterResize = 
fetcherManager.failedPartitions.failedPartitions()
+  // Verify that failed partitions within resizing scope are removed, 
otherwise retained
+  assertEquals(Set.empty, 
fetchingTopicPartitions.intersect(failedPartitionsAfterResize))
+  assertEquals(failedTopicPartitions, 
failedTopicPartitions.intersect(failedPartitionsAfterResize))

Review comment:
   Do we really need `intersect`? I thought that 
`failedPartitionsAfterResize` should be equal to `failedTopicPartitions`, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

2022-03-30 Thread GitBox


ijuma commented on a change in pull request #11965:
URL: https://github.com/apache/kafka/pull/11965#discussion_r838688468



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1234,8 +1234,9 @@ class ReplicaManager(val config: KafkaConfig,
fetchOffset: Long,
currentTimeMs: Long): Option[Int] = {
 partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>

Review comment:
   Looks like I assumed that leaderReplicaIdOpt would only be set if the 
broker was the leader. We should check if the docs need to be clarified (and 
close the test coverage gap, of course).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13778) Fetch from follower should never run the preferred read replica selection

2022-03-30 Thread zhaobo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhaobo updated KAFKA-13778:
---
Summary: Fetch from follower should never run the preferred read replica 
selection  (was: Fix follower broker also always execute preferred read-replica 
selection)

> Fetch from follower should never run the preferred read replica selection
> -
>
> Key: KAFKA-13778
> URL: https://issues.apache.org/jira/browse/KAFKA-13778
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.2.0
>Reporter: zhaobo
>Assignee: zhaobo
>Priority: Minor
>
> The design purpose of the code is that only the leader broker can determine 
> the preferred read-replica.
>  
> {code:java}
> readFromLocalLog()
> 
> // If we are the leader, determine the preferred read-replica
> val preferredReadReplica = clientMetadata.flatMap(
>   metadata => findPreferredReadReplica(partition, metadata, replicaId, 
> fetchInfo.fetchOffset, fetchTimeMs)) {code}
>  
> But in fact, since the broker does not judge whether it is the leader or not, 
> the follower will also execute the preferred read-replica selection.
> {code:java}
> partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
>   // Don't look up preferred for follower fetches via normal replication and
>   if (Request.isValidBrokerId(replicaId))
> None
>   else { {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] bozhao12 commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

2022-03-30 Thread GitBox


bozhao12 commented on pull request #11965:
URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083262191


   @dajac Thanks for your review, the case you mentioned does occur in some 
scenarios.  I will submit a new unit test based on your suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest

2022-03-30 Thread GitBox


cadonna commented on pull request #11971:
URL: https://github.com/apache/kafka/pull/11971#issuecomment-1083246616


   @dajac Please go ahead!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11971: KAFKA-13783; Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest

2022-03-30 Thread GitBox


dajac commented on pull request #11971:
URL: https://github.com/apache/kafka/pull/11971#issuecomment-1083244092


   cc @cadonna I'd like to get this one in 3.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11928: KAFKA-13739: Sliding window with no grace period not working

2022-03-30 Thread GitBox


bbejeck commented on pull request #11928:
URL: https://github.com/apache/kafka/pull/11928#issuecomment-1083230921


   @tombentley I'd like to get this into 3.1.1 as well 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

2022-03-30 Thread GitBox


dajac commented on pull request #11965:
URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083229729


   cc @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac edited a comment on pull request #11965: KAFKA-13778: Fetch from follower should never run the preferred read replica selection

2022-03-30 Thread GitBox


dajac edited a comment on pull request #11965:
URL: https://github.com/apache/kafka/pull/11965#issuecomment-1083225964


   @bozhao12 Thanks for reporting this one. I took a deeper look at it and I 
agree with your finding. The follower runs the preferred read replica selection 
logic as well. Nice one ;)
   
   In most of the cases, it still works because, as you said, the 
`RackAwareReplicaSelector` returns the leader when it cannot find a replica in 
the same rack and the leader is filtered out in the logic. If you have more 
than 1 replicas per rack, you can easily get in a situation where the consumer 
can't consumer anything because it is redirected continuously between the two 
replicas. This is only possible if the replica has still some replica states 
left around from it previous leadership. This can happen when the partition is 
reassigned multiple times for instance.
   
   I suppose that things could be worse depending on the implementation of the 
`ReplicaSelector`. Luckily most people use the `RackAwareReplicaSelector` and 
three replicas.
   
   That seems to be a regression introduced in this commit: 
https://github.com/apache/kafka/commit/fbfda2c4ad889c731aa52b5214e0521f187f8db6.
 
   
   I do agree that we should fix this but we need to come up with a better test 
which verifies this. Your current test does not really fail because the leader 
is automatically removed. We could perhaps create a `MockSelector` which 
implements `ReplicaSelector` and incremented a counter or something along those 
line. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >