[GitHub] [kafka] dajac commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


dajac commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-883146836


   > @dajac , sorry, I might misunderstand the ticket. So we just change for 
`AlterConsumerGroupOffsetsHandler` only, is that right? I mean, if we just 
wanted to change the `REBALANCE_IN_PROGRESS` error.
   
   I haven't checked the handlers so I don't know :). We need to check where 
`REBALANCE_IN_PROGRESS` is actually expected and decide if making it retryable 
make sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


showuon commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-883149180


   Yes, you're right. We have handled all kinds of expected errors in previous 
PRs. So, we should only update for `AlterConsumerGroupOffsetsHandler` here, 
unless we also want to put `GROUP_AUTHORIZATION_FAILED` into retriable error 
(but I don't think that makes sense, though). 
   
   I've updated the PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13103) Should group admin handlers consider REBALANCE_IN_PROGRESS and GROUP_AUTHORIZATION_FAILED as retryable errors?

2021-07-20 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17383852#comment-17383852
 ] 

Luke Chen commented on KAFKA-13103:
---

In the PR, I put the `REBALANCE_IN_PROGRESS` into retriable error. But keep 
`GROUP_AUTHORIZATION_FAILED` as failed since I also think it doesn't make sense 
to put `GROUP_AUTHORIZATION_FAILED` as retriable error. Thanks.

> Should group admin handlers consider REBALANCE_IN_PROGRESS and 
> GROUP_AUTHORIZATION_FAILED as retryable errors?
> --
>
> Key: KAFKA-13103
> URL: https://issues.apache.org/jira/browse/KAFKA-13103
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: Luke Chen
>Priority: Major
>
> [~rajinisiva...@gmail.com] and I were discussing if we should consider 
> REBALANCE_IN_PROGRESS and GROUP_AUTHORIZATION_FAILED as retryable errors in 
> the group handlers. I think that this could make sense, especially for 
> `REBALANCE_IN_PROGRESS`. `GROUP_AUTHORIZATION_FAILED` is more debatable as it 
> means that the handler would retry until the request timeout is reached. It 
> might be armful if the authorisation is really denied.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

2021-07-20 Thread GitBox


jlprat commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672944792



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
 }
 }
 
-private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) 
throws InterruptedException {
+TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
   Correct me if I'm wrong, I just want to understand better the case you 
are raising. You mean to keep the `TaskMetadata` used for this test and then 
compare them with the `activeTasks`, right?
   But I don't really understand how this will prevent from discrepancies if 
the task has been revoked in between calls. Wouldn't it be then also not 
present in `activeTasks`?
   
   Sorry to hijack this for this question and thanks in advance!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-20 Thread GitBox


ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   Ah, yeah I guess it would have always had to handle `DEAD` threads since 
in the before-time (ie before we added the `add/removeStreamThread` APIs) it 
was always possible for a thread to just die when hit with an unexpected 
exception.
   
   That said, I feel a lot better about trimming a removed thread from the list 
explicitly. Don't want to build up a mass grave of mostly dead threads (well, 
dead thread store providers) that can never be garbage collected over the 
application's lifetime

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   It seems a bit weird to have to create a `new 
StreamThreadStateStoreProvider(streamThread)` just to remove and existing 
`StreamThreadStateStoreProvider` from this -- can we maybe change it so that 
`#removeStoreProvider` accepts a `StreamThread` reference, or even just a 
`String streamThreadName`? And then store a map from name to 
`StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10978: MINOR: Use time constant algorithms when comparing passwords or keys

2021-07-20 Thread GitBox


rhauch commented on pull request #10978:
URL: https://github.com/apache/kafka/pull/10978#issuecomment-882731540


   Thanks for the review, folks. @omkreddy, I renamed the new utility method as 
you suggested, and updated the JavaDocs as well. Hopefully the purpose and 
behavior is more clear.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-20 Thread GitBox


showuon commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that 
it might be more eager than before, to fetch the offset. But, looks like we 
need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


dajac commented on pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#issuecomment-882493385






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: retry for tasks

2021-07-20 Thread GitBox


wcarlson5 commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-882919484


   @ableegoldman @jlprat @cadonna Can I get a review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11075: MINOR: Move off deprecated APIs in StreamsResetter

2021-07-20 Thread GitBox


jolshan commented on a change in pull request #11075:
URL: https://github.com/apache/kafka/pull/11075#discussion_r672384414



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -655,7 +655,7 @@ public void doDelete(final List topicsToDelete,
  final Admin adminClient) {
 boolean hasDeleteErrors = false;
 final DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(topicsToDelete);
-final Map> results = 
deleteTopicsResult.values();
+final Map> results = 
deleteTopicsResult.topicNameValues();

Review comment:
   This is part of KIP-516

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -655,7 +655,7 @@ public void doDelete(final List topicsToDelete,
  final Admin adminClient) {
 boolean hasDeleteErrors = false;
 final DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(topicsToDelete);
-final Map> results = 
deleteTopicsResult.values();
+final Map> results = 
deleteTopicsResult.topicNameValues();

Review comment:
   I believe the jira was under the kip's main jira, but I can make the 
link clearer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11085: MINOR: reduce debug log spam and busy loop during shutdown

2021-07-20 Thread GitBox


ableegoldman commented on pull request #11085:
URL: https://github.com/apache/kafka/pull/11085#issuecomment-883003472


   call for review @wcarlson5 @lct45 @guozhangwang @vvcephei @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10884: MINOR: replace deprecated exactly_once_beta into exactly_once_v2

2021-07-20 Thread GitBox


showuon commented on pull request #10884:
URL: https://github.com/apache/kafka/pull/10884#issuecomment-882474964


   @cadonna , sorry, it turns out the version `LATEST_3_0` cannot be tested 
yet. Remove it to make it work. Thank you. 
   
   ```
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:   2021-07-19--014
   run time: 3 minutes 0.207 seconds
   tests run:4
   passed:   4
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.5.1
   status: PASS
   run time:   45.659 seconds
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.6.2
   status: PASS
   run time:   42.687 seconds
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.7.1
   status: PASS
   run time:   45.178 seconds
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.8.0
   status: PASS
   run time:   46.120 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


rajinisivaram commented on a change in pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#discussion_r672176498



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -73,29 +75,40 @@ public String apiName() {
 return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
 }
 
-@Override
-public OffsetCommitRequest.Builder buildRequest(int coordinatorId, 
Set keys) {
-List topics = new ArrayList<>();
-Map> offsetData = new 
HashMap<>();
-for (Map.Entry entry : 
offsets.entrySet()) {
-String topic = entry.getKey().topic();
-OffsetAndMetadata oam = entry.getValue();
-OffsetCommitRequestPartition partition = new 
OffsetCommitRequestPartition()
-.setCommittedOffset(oam.offset())
-.setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
-.setCommittedMetadata(oam.metadata())
-.setPartitionIndex(entry.getKey().partition());
-offsetData.computeIfAbsent(topic, key -> new 
ArrayList<>()).add(partition);
-}
-for (Map.Entry> entry : 
offsetData.entrySet()) {
-OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
-.setName(entry.getKey())
-.setPartitions(entry.getValue());
-topics.add(topic);
+private void validateKeys(
+Set groupIds
+) {

Review comment:
   I think we put args on the same line unless the list is too big

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -105,53 +118,96 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
+final Map partitionResults = new HashMap<>();
 
-Map partitions = new HashMap<>();
 for (OffsetCommitResponseTopic topic : response.data().topics()) {
 for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
 Errors error = Errors.forCode(partition.errorCode());
+
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleError(
+groupId,
+topicPartition,
+error,
+partitionResults,
+groupsToUnmap,
+groupsToRetry
+);
 } else {
-partitions.put(tp, error);
+partitionResults.put(topicPartition, error);
 }
 }
 }
-if (failed.isEmpty() && unmapped.isEmpty())
-completed.put(groupId, partitions);
 
-return new ApiResult<>(completed, failed, unmapped);
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+Collections.singletonMap(groupId, partitionResults),
+Collections.emptyMap(),
+Collections.emptyList()
+);
+} else {
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
+}
 }
 
 private void handleError(
 CoordinatorKey groupId,
+TopicPartition topicPartition,
 Errors error,
-Map failed,
-List unmapped
+Map partitionResults,
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
-case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-error.exception());
-failed.put(groupId, error.exception());
-break;
+// If the coordinator is in the middle of loading, then we just 
need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
+log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
+" is still in the process of loading state. Will retry.", 
groupId.idValue);
+groupsToRetry.add(group

[GitHub] [kafka] junrao merged pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-20 Thread GitBox


junrao merged pull request #10579:
URL: https://github.com/apache/kafka/pull/10579


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11080: fix NPE when record==null in append

2021-07-20 Thread GitBox


ccding commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-882800476


   This PR is ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672245627



##
File path: core/src/main/scala/kafka/log/LogConfig.scala
##
@@ -479,4 +501,31 @@ object LogConfig {
 logProps.put(MessageDownConversionEnableProp, 
kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
 logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: 
ApiVersion): Boolean =
+interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
   Yeah, I do agree with you. I don't think that anyone already issues 
`3.0-IV1` anyway.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
*  Start the background threads to flush logs and do log cleanup
*/
   def startup(topicNames: Set[String]): Unit = {
-startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+// ensure consistency between default config and overrides
+val defaultConfig = currentDefaultConfig
+startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): 
Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, 
topicNames: Set[String]): Map[String, LogConfig] = {
 val topicConfigOverrides = mutable.Map[String, LogConfig]()
-val defaultProps = currentDefaultConfig.originals()
+val defaultProps = defaultConfig.originals()
 topicNames.foreach { topicName =>
-  val overrides = configRepository.topicConfig(topicName)
+  var overrides = configRepository.topicConfig(topicName)

Review comment:
   For my own education, could `Properties` by unmodifiable somehow? 
   
   I pointed that out because the scaladoc of the `ConfigRepository` trait 
states the following `@return a copy of the configuration for the given 
resource`. It seems that the two main implementations respect this however 
`MockConfigRepository` does not. Therefore, I tend to agree with you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11067: MINOR: log broker configs in KRaft mode

2021-07-20 Thread GitBox


cmccabe merged pull request #11067:
URL: https://github.com/apache/kafka/pull/11067


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


dajac commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-883106256






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672253775



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
*  Start the background threads to flush logs and do log cleanup
*/
   def startup(topicNames: Set[String]): Unit = {
-startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+// ensure consistency between default config and overrides
+val defaultConfig = currentDefaultConfig
+startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): 
Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, 
topicNames: Set[String]): Map[String, LogConfig] = {
 val topicConfigOverrides = mutable.Map[String, LogConfig]()
-val defaultProps = currentDefaultConfig.originals()
+val defaultProps = defaultConfig.originals()
 topicNames.foreach { topicName =>
-  val overrides = configRepository.topicConfig(topicName)
+  var overrides = configRepository.topicConfig(topicName)

Review comment:
   Oh, interesting, so it does specify that a copy is returned. I had 
missed that. Since the mock config repository does not implement it correctly, 
as you found, I think it's better to handle this via a separate PR (it's 
possible that unrelated tests may fail as a result and may need fixing).

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
*  Start the background threads to flush logs and do log cleanup
*/
   def startup(topicNames: Set[String]): Unit = {
-startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+// ensure consistency between default config and overrides
+val defaultConfig = currentDefaultConfig
+startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): 
Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, 
topicNames: Set[String]): Map[String, LogConfig] = {
 val topicConfigOverrides = mutable.Map[String, LogConfig]()
-val defaultProps = currentDefaultConfig.originals()
+val defaultProps = defaultConfig.originals()
 topicNames.foreach { topicName =>
-  val overrides = configRepository.topicConfig(topicName)
+  var overrides = configRepository.topicConfig(topicName)

Review comment:
   Regarding unmodifiable `Properties`, there is no built-in way. My 
concern was something like the `MockConfigRepository` issue you found where 
copies are not done with the assumption that the caller should not mutate (even 
though there is no simple way to prevent that - another reason not to use 
`Properties`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11078: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma merged pull request #11078:
URL: https://github.com/apache/kafka/pull/11078


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-07-20 Thread GitBox


ijuma commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-882984058






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11067: MINOR: log broker configs in KRaft mode

2021-07-20 Thread GitBox


ijuma commented on a change in pull request #11067:
URL: https://github.com/apache/kafka/pull/11067#discussion_r672525835



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   Can we file a JIRA?

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   I also learned that the ZK path logs dynamic configs too now. So, we can 
have a JIRA about logging dynamic configs for KRaft and mention there the 
potential clean-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11064: MINOR: enable reassign_partitions_test.py for kraft

2021-07-20 Thread GitBox


cmccabe merged pull request #11064:
URL: https://github.com/apache/kafka/pull/11064


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11080: fix NPE when record==null in append

2021-07-20 Thread GitBox


ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672610426



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Is this really an exceptional case? Don't we do reads where we don't 
know exactly where the read ends and hence will trigger this path?

##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   I think the intent here was to cover the case where an incomplete record 
is returned by the broker. However, we have broker logic to try and avoid this 
case since KIP-74:
   
   ```java
   } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
   // For FetchRequest version 3, we replace incomplete message 
sets with an empty one as consumers can make
   // progress in such cases and don't need to report a 
`RecordTooLargeException`
   FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, 
MemoryRecords.EMPTY)
   ```
   
   @hachikuji Do you remember if there is still a reason to return `null` here 
instead of the exception @ccding is proposing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


dajac merged pull request #11016:
URL: https://github.com/apache/kafka/pull/11016


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


dajac commented on a change in pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#discussion_r672231624



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -105,53 +118,96 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
+final Map partitionResults = new HashMap<>();
 
-Map partitions = new HashMap<>();
 for (OffsetCommitResponseTopic topic : response.data().topics()) {
 for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
 Errors error = Errors.forCode(partition.errorCode());
+
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleError(
+groupId,
+topicPartition,
+error,
+partitionResults,
+groupsToUnmap,
+groupsToRetry
+);
 } else {
-partitions.put(tp, error);
+partitionResults.put(topicPartition, error);
 }
 }
 }
-if (failed.isEmpty() && unmapped.isEmpty())
-completed.put(groupId, partitions);
 
-return new ApiResult<>(completed, failed, unmapped);
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+Collections.singletonMap(groupId, partitionResults),
+Collections.emptyMap(),
+Collections.emptyList()
+);
+} else {
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
+}
 }
 
 private void handleError(
 CoordinatorKey groupId,
+TopicPartition topicPartition,
 Errors error,
-Map failed,
-List unmapped
+Map partitionResults,
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
-case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-error.exception());
-failed.put(groupId, error.exception());
-break;
+// If the coordinator is in the middle of loading, then we just 
need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
+log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
+" is still in the process of loading state. Will retry.", 
groupId.idValue);
+groupsToRetry.add(groupId);
+break;
+
+// If the coordinator is not available, then we unmap and retry.
 case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
-log.debug("OffsetCommit request for group {} returned error 
{}. Will retry", groupId, error);
-unmapped.add(groupId);
+log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+groupId.idValue, error);
+groupsToUnmap.add(groupId);
 break;
+
+// Group level errors.
+case INVALID_GROUP_ID:
+case REBALANCE_IN_PROGRESS:
+case INVALID_COMMIT_OFFSET_SIZE:

Review comment:
   I think it is. It basically indicate that we could write the group 
metadata to the log so it concerns the group. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L448

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -105,53 +118,96 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap 

[GitHub] [kafka] ijuma merged pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma merged pull request #11036:
URL: https://github.com/apache/kafka/pull/11036


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11070: Validate the controllerListener config on startup

2021-07-20 Thread GitBox


mumrah commented on a change in pull request #11070:
URL: https://github.com/apache/kafka/pull/11070#discussion_r672407194



##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -137,8 +137,14 @@ class ControllerServer(
 credentialProvider,
 apiVersionManager)
   socketServer.startup(startProcessingRequests = false, 
controlPlaneListener = None, config.controllerListeners)
-  socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
-config.controllerListeners.head.listenerName))
+
+  if(config.controllerListeners.nonEmpty) {
+socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
+  config.controllerListeners.head.listenerName))
+  } else {
+fatal("No controllerListener defined for controller")
+throw new IllegalArgumentException()

Review comment:
   We have `ConfigException` for these such cases

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1914,6 +1914,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
+require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
+  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty")

Review comment:
   How about "[...] cannot be empty if the server has the controller role"

##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -137,8 +137,14 @@ class ControllerServer(
 credentialProvider,
 apiVersionManager)
   socketServer.startup(startProcessingRequests = false, 
controlPlaneListener = None, config.controllerListeners)
-  socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
-config.controllerListeners.head.listenerName))
+
+  if(config.controllerListeners.nonEmpty) {
+socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
+  config.controllerListeners.head.listenerName))
+  } else {
+fatal("No controllerListener defined for controller")

Review comment:
   Instead of "controllerListener" let's log the full property name

##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -137,8 +137,14 @@ class ControllerServer(
 credentialProvider,
 apiVersionManager)
   socketServer.startup(startProcessingRequests = false, 
controlPlaneListener = None, config.controllerListeners)
-  socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
-config.controllerListeners.head.listenerName))
+
+  if(config.controllerListeners.nonEmpty) {

Review comment:
   nit: need space after `if` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-07-20 Thread GitBox


satishd commented on pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#issuecomment-883064875


   @junrao This PR is rebased with trunk, please review and let me know your 
comments.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11067: MINOR: log broker configs in KRaft mode

2021-07-20 Thread GitBox


cmccabe commented on a change in pull request #11067:
URL: https://github.com/apache/kafka/pull/11067#discussion_r672512638



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   > 1. Where are we logging this for the zk case?
   It's logged from DynamicBrokerConfig.initialize (see above stack trace)
   
   > 2. Can we expose a method in KafkaConfig to write the configs to a logger 
or something? Seems a bit cleaner than this approach.
   Can we do this after 3.0? Due to the inheritance-based design here, there 
isn't a simple way to expose this without changing the base class. The comment 
does explain what the line does...

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   > 1. Where are we logging this for the zk case?
   
   It's logged from DynamicBrokerConfig.initialize (see above stack trace)
   
   > 2. Can we expose a method in KafkaConfig to write the configs to a logger 
or something? Seems a bit cleaner than this approach.
   
   Can we do this after 3.0? Due to the inheritance-based design here, there 
isn't a simple way to expose this without changing the base class. The comment 
does explain what the line does...

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   > 1. Where are we logging this for the zk case?
   
   In the ZK case, it's logged from DynamicBrokerConfig.initialize (see above 
stack trace)
   
   > 2. Can we expose a method in KafkaConfig to write the configs to a logger 
or something? Seems a bit cleaner than this approach.
   
   Can we do this after 3.0? Due to the inheritance-based design here, there 
isn't a simple way to expose this without changing the base class. The comment 
does explain what the line does...

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-13105




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat closed pull request #11025: Scala 3 Compilation with 2.12 support

2021-07-20 Thread GitBox


jlprat closed pull request #11025:
URL: https://github.com/apache/kafka/pull/11025


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

2021-07-20 Thread GitBox


ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672758944



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Are you saying the case that we are yet to complete reading the request? 
I didn't see a retry path, but it will cause a null point exception at 
https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest me do here?

##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Are you saying the case that we are yet to complete reading the request? 
I didn't see a retry path, but it will cause a null point exception at 
https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest I do here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11078: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma commented on pull request #11078:
URL: https://github.com/apache/kafka/pull/11078#issuecomment-882544244






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-20 Thread GitBox


showuon edited a comment on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that 
it might be a little more eager than before, to fetch the offset. But, looks 
like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


showuon commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-883022752






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoeCqupt closed pull request #11088: MINOR: remove unnecessary judgment in method: assignReplicasToBrokersRackAware

2021-07-20 Thread GitBox


JoeCqupt closed pull request #11088:
URL: https://github.com/apache/kafka/pull/11088






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-20 Thread GitBox


ijuma merged pull request #10811:
URL: https://github.com/apache/kafka/pull/10811


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

2021-07-20 Thread GitBox


dielhennr commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r672693205



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -42,23 +42,38 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static 
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
 
 
 public class ConfigurationControlManager {
+static final ConfigResource DEFAULT_NODE_RESOURCE = new 
ConfigResource(Type.BROKER, "");
+
 private final Logger log;
+private final int nodeId;
+private final ConfigResource currentNodeResource;
 private final SnapshotRegistry snapshotRegistry;
 private final Map configDefs;
+private final TimelineHashMap emptyMap;

Review comment:
   I don't think it can be. It needs to be a TimelineHashMap to work and 
needs to receive the snapshot registry in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


mumrah commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882589793


   Thanks @ijuma, I don't have any more questions or follow-ups 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

2021-07-20 Thread GitBox


jsancio commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672716678



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -302,17 +302,13 @@ private void onUpdateLeaderHighWatermark(
 }
 
 private void updateListenersProgress(long highWatermark) {
-updateListenersProgress(listenerContexts, highWatermark);
-}
-
-private void updateListenersProgress(List 
listenerContexts, long highWatermark) {
 for (ListenerContext listenerContext : listenerContexts) {
 listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
 if (nextExpectedOffset < log.startOffset() && 
nextExpectedOffset < highWatermark) {
 SnapshotReader snapshot = 
latestSnapshot().orElseThrow(() -> new IllegalStateException(
 String.format(
 "Snapshot expected since next offset of %s is %s, 
log start offset is %s and high-watermark is %s",
-listenerContext.listener.getClass().getTypeName(),
+listenerContext.listenerName(),

Review comment:
   Note that all of the change to the `raft` module are cosmetic mainly to 
improving logging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11082: KAFKA-13104: Controller should notify raft client when it resigns

2021-07-20 Thread GitBox


jsancio commented on a change in pull request #11082:
URL: https://github.com/apache/kafka/pull/11082#discussion_r672645909



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -284,6 +284,7 @@ private Throwable handleEventException(String name,
 "Reverting to last committed offset {}.",
 this, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
 lastCommittedOffset, exception);
+raftClient.resign(curClaimEpoch);

Review comment:
   Do we also need to resign and renounce in line 272 of this file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11082: KAFKA-13104: Controller should notify raft client when it resigns

2021-07-20 Thread GitBox


dielhennr commented on a change in pull request #11082:
URL: https://github.com/apache/kafka/pull/11082#discussion_r672657134



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -284,6 +284,7 @@ private Throwable handleEventException(String name,
 "Reverting to last committed offset {}.",
 this, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
 lastCommittedOffset, exception);
+raftClient.resign(curClaimEpoch);

Review comment:
   @jsancio For 272 to execute, startProcessingTime must not be present. 
The only place I see an exception get thrown before startProcessingTime is 
defined is in the ControllerWriteEvent and it is a NotControllerException. This 
would mean that we would not have to resign/renounce since it is already not 
the controller.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -284,6 +284,7 @@ private Throwable handleEventException(String name,
 "Reverting to last committed offset {}.",
 this, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
 lastCommittedOffset, exception);
+raftClient.resign(curClaimEpoch);

Review comment:
   Forwarding write requests to the controller is a requirement for KRaft

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -284,6 +284,7 @@ private Throwable handleEventException(String name,
 "Reverting to last committed offset {}.",
 this, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
 lastCommittedOffset, exception);
+raftClient.resign(curClaimEpoch);

Review comment:
   @jsancio For 272 to execute, startProcessingTime must not be present. 
The only place I see an exception get thrown before startProcessingTime is 
defined is in the ControllerWriteEvent and it is a NotControllerException. This 
would mean that resign/renounce should not be on line 272 since it is already 
not the controller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rackom commented on pull request #10824: KAFKA-12718: SessionWindows are closed too early

2021-07-20 Thread GitBox


rackom commented on pull request #10824:
URL: https://github.com/apache/kafka/pull/10824#issuecomment-882348752


   Hello, is there any way how to fix it in version 2.5.0? Because this is 
causing us some issues with very short window + gap combinations (window: 
10sec, grace: 5sec). It is not an option for us to upgrade to newer version 
since it has too many impacts on our solution atm.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13106) Offsets deletion error

2021-07-20 Thread Robert Janda (Jira)
Robert Janda created KAFKA-13106:


 Summary: Offsets deletion error
 Key: KAFKA-13106
 URL: https://issues.apache.org/jira/browse/KAFKA-13106
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 2.7.0, 2.3.1
 Environment: Linux
Reporter: Robert Janda
 Fix For: 2.7.0


When I use:

kafka-consumer-groups.sh --bootstrap-server broker:9092 --delete-offsets 
--group myGroup --topic myTopic

 

I have a error:

 

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/kafka/libs/slf4j-log4j12-1.7.26.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/kafka/libs/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See [http://www.slf4j.org/codes.html#multiple_bindings] for an 
explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Exception in thread "main" joptsimple.UnrecognizedOptionException: 
delete-offsets is not a recognized option

    at 
joptsimple.OptionException.unrecognizedOption(OptionException.java:108)

    at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)

    at 
joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)

    at joptsimple.OptionParser.parse(OptionParser.java:396)

    at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupCommandOptions.(ConsumerGroupCommand.scala:917)

    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:46)

    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)

 


 When I use kafka protocol base on:
 [https://kafka.apache.org/protocol#The_Messages_OffsetDelete]
 
 I have kafka log:
 
 [2021-07-20 07:23:37,832] ERROR Closing socket for 
10.1.1.20:9092-192.168.65.3:56050-0 because of error (kafka.network.Processor)

org.apache.kafka.common.errors.InvalidRequestException: Unknown API key 47

[2021-07-20 07:23:37,835] ERROR Exception while processing request from 
10.1.1.20:9092-192.168.65.3:56050-0 (kafka.network.Processor)

org.apache.kafka.common.errors.InvalidRequestException: Unknown API key 47
 
 
 Base on:
 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
 
 Those things should be working.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon edited a comment on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


showuon edited a comment on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-883108362


   @dajac , sorry, I might misunderstand the ticket. So we just change for 
`AlterConsumerGroupOffsetsHandler` only, is that right? I mean, if we just 
wanted to change the `REBALANCE_IN_PROGRESS` error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


showuon commented on a change in pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#discussion_r672778548



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3184,8 +3197,111 @@ public void testDeleteConsumerGroupsRetryBackoff() 
throws Exception {
 }
 }
 
+// this test is testing retriable errors and non-retriable errors in the 
new broker
 @Test
-public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
+public void testDeleteConsumerGroupsWithRetriableAndNonretriableErrors() 
throws Exception {

Review comment:
   We used to only have tests for older broker. Add a test for new broker.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3184,8 +3197,111 @@ public void testDeleteConsumerGroupsRetryBackoff() 
throws Exception {
 }
 }
 
+// this test is testing retriable errors and non-retriable errors in the 
new broker
 @Test
-public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
+public void testDeleteConsumerGroupsWithRetriableAndNonretriableErrors() 
throws Exception {

Review comment:
   We used to only have tests for older broker for deleteConsumerGroup 
test. Add a test for new broker.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11085: MINOR: reduce debug log spam and busy loop during shutdown

2021-07-20 Thread GitBox


ableegoldman commented on a change in pull request #11085:
URL: https://github.com/apache/kafka/pull/11085#discussion_r672760705



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -719,10 +719,10 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
-// Shutdown hook could potentially be triggered and transit the thread 
state to PENDING_SHUTDOWN during #pollRequests().
-// The task manager internal states could be uninitialized if the 
state transition happens during #onPartitionsAssigned().

Review comment:
   This comment and the short-circuit `return` was a fix for an NPE from a 
year or two ago, but it turns out we actually broke this fix when we 
encapsulated everything into the `pollPhase` -- [the 
fix](https://issues.apache.org/jira/browse/KAFKA-8620) was to return in between 
returning from `poll()` and calling `addRecordsToTasks()`, since we could end 
up with uninitialized tasks/TaskManager state if the shutdown hook was 
triggered during the rebalance callback. 
   Luckily, at some point we happened to shore up the task management logic so 
that the rebalance callbacks will always proceed even if the thread has already 
been told to shut down, so we're not in any trouble here. This also means that 
technically, we don't even need to `return` here anymore -- but there's no real 
reason to continue through the loop, so I just updated the comment and left it 
as is.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -885,8 +885,8 @@ private long pollPhase() {
 records = pollRequests(pollTime);
 } else if (state == State.PENDING_SHUTDOWN) {
 // we are only here because there's rebalance in progress,
-// just poll with zero to complete it
-records = pollRequests(Duration.ZERO);
+// just long poll to give it enough time to complete it
+records = pollRequests(pollTime);

Review comment:
   This is the main fix, see the PR description for full context. I was 
actually wondering if we shouldn't go even further and call `poll(MAX_VALUE)` 
instead, since there's really no reason to return from poll when the thread is 
shutting down but a rebalance is still in progress. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel commented on pull request #11070: Validate the controllerListener config on startup

2021-07-20 Thread GitBox


niket-goel commented on pull request #11070:
URL: https://github.com/apache/kafka/pull/11070#issuecomment-882782314


   This PR ends up failing a bunch of existing unit tests. Taking a look at the 
failures. Will update the PR.
   
   List of failed tests
   ```
   ConnectionQuotasTest. 
testListenerConnectionRateLimitWhenActualRateAboveLimit()
   SocketServerTest. 
testClientDisconnectionWithOutstandingReceivesProcessedUntilFailedSend()
   ControllerApisTest. testCreatePartitionsRequest()
   ControllerApisTest. testCreateTopics()
   ControllerApisTest. testDeleteTopicsById()
   ControllerApisTest. testDeleteTopicsByName()
   ControllerApisTest. testDeleteTopicsDisabled()
   ControllerApisTest. testFetchSentToKRaft()
   ControllerApisTest. testFetchSnapshotSentToKRaft()
   ControllerApisTest. testHandleLegacyAlterConfigsErrors()
   ControllerApisTest. testInvalidDeleteTopicsRequest()
   ControllerApisTest. testInvalidIncrementalAlterConfigsResources()
   ControllerApisTest. testNotAuthorizedToDeleteWithTopicExisting()
   ControllerApisTest. testNotAuthorizedToDeleteWithTopicNotExisting()
   ControllerApisTest. testNotControllerErrorPreventsDeletingTopics()
   ControllerApisTest. testUnauthorizedBeginQuorumEpoch()
   ControllerApisTest. testUnauthorizedBrokerRegistration()
   ControllerApisTest. testUnauthorizedDescribeQuorum()
   ControllerApisTest. testUnauthorizedEndQuorumEpoch()
   ControllerApisTest. testUnauthorizedFetch()
   ControllerApisTest. testUnauthorizedFetchSnapshot()
   ControllerApisTest. testUnauthorizedHandleAllocateProducerIds()
   ControllerApisTest. testUnauthorizedHandleAlterClientQuotas()
   ControllerApisTest. testUnauthorizedHandleAlterIsrRequest()
   ControllerApisTest. testUnauthorizedHandleAlterPartitionReassignments()
   ControllerApisTest. testUnauthorizedHandleBrokerHeartBeatRequest()
   ControllerApisTest. testUnauthorizedHandleIncrementalAlterConfigs()
   ControllerApisTest. testUnauthorizedHandleListPartitionReassignments()
   ControllerApisTest. testUnauthorizedHandleUnregisterBroker()
   ControllerApisTest. testUnauthorizedVote()
   KafkaRaftServerTest. testLoadMetaPropertiesWithInconsistentNodeId()
   KafkaRaftServerTest. testSuccessfulLoadMetaProperties()
   StorageToolTest. testConfigToLogDirectories()
   StorageToolTest. testConfigToLogDirectoriesWithMetaLogDir()
   StorageToolTest. testFormatWithInvalidClusterId()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882511896






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

2021-07-20 Thread GitBox


jlprat commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672944792



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
 }
 }
 
-private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) 
throws InterruptedException {
+TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
   Correct me if I'm wrong, I just want to understand better the case you 
are raising. You mean to keep the `TaskMetadata` used for this test and then 
compare them with the `activeTasks`, right?
   But I don't really understand how this will prevent from discrepancies if 
the task has been revoked in between calls. Wouldn't it be then also not 
present in `activeTasks`?
   
   Sorry to hijack this for this question and thanks in advance!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11064: MINOR: enable reassign_partitions_test.py for kraft

2021-07-20 Thread GitBox


rondagostino commented on pull request #11064:
URL: https://github.com/apache/kafka/pull/11064#issuecomment-882573376






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

2021-07-20 Thread GitBox


ableegoldman commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672762836



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
 }
 }
 
-private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) 
throws InterruptedException {
+TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
   I'm sure the odds of this are super low, but to really make this 
airtight you'd need to have the `List` that you use for the test 
be the same one that you test in this condition, otherwise it's _possible_ for 
the one task to have been revoked in the split second between 
`waitForCondition()` and the new call to `metadataForLocalThreads()`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr closed pull request #11081: MINOR: Typo in RaftClient Javadoc

2021-07-20 Thread GitBox


dielhennr closed pull request #11081:
URL: https://github.com/apache/kafka/pull/11081


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11064: MINOR: enable reassign_partitions_test.py for kraft

2021-07-20 Thread GitBox


ijuma commented on pull request #11064:
URL: https://github.com/apache/kafka/pull/11064#issuecomment-882580369


   Thanks for the explanation @rondagostino.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-20 Thread GitBox


showuon commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-882996715


   @ijuma , thanks for the update. It looks better now! Also, thank you and 
@rondagostino for your patiently review!
   
   For your question:
   > it looks to me that we don't have test coverage in ConfigCommandTest for 
the case where we try to update broker configs via zk when the brokers are up, 
is that right?
   
   I actually added a test for it:
   `shouldNotAllowAddBrokerQuotaConfigWhileBrokerUpUsingZookeeper` 
[here](https://github.com/apache/kafka/blob/da252c633d88ecec6f68200081ad94a3081e5f35/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala#L905)
   
   And this test is to test update multiple broker configs using zookeeper 
(when no brokers up)
   `testDynamicBrokerConfigUpdateUsingZooKeeper` 
[here](https://github.com/apache/kafka/blob/da252c633d88ecec6f68200081ad94a3081e5f35/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala#L1245)
   
   Do you think we should add more tests for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11075: MINOR: Move off deprecated APIs in StreamsResetter

2021-07-20 Thread GitBox


mjsax commented on a change in pull request #11075:
URL: https://github.com/apache/kafka/pull/11075#discussion_r672438866



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -655,7 +655,7 @@ public void doDelete(final List topicsToDelete,
  final Admin adminClient) {
 boolean hasDeleteErrors = false;
 final DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(topicsToDelete);
-final Map> results = 
deleteTopicsResult.values();
+final Map> results = 
deleteTopicsResult.topicNameValues();

Review comment:
   Thanks! Can we cross link KIP wiki page and Jiras to make it easier to 
find?

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -655,7 +655,7 @@ public void doDelete(final List topicsToDelete,
  final Admin adminClient) {
 boolean hasDeleteErrors = false;
 final DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(topicsToDelete);
-final Map> results = 
deleteTopicsResult.values();
+final Map> results = 
deleteTopicsResult.topicNameValues();

Review comment:
   Thanks!
   
   It's also best, to mention a KIP in the commit message. I guess @hachikuji 
just forgot to add it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

2021-07-20 Thread GitBox


cmccabe commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672715848



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
 newEpoch + ", but we never renounced controller 
epoch " +
 curEpoch);
 }
-log.warn("Becoming active at controller epoch {}.", 
newEpoch);
+log.info(

Review comment:
   This can fit in two lines... let's try to avoid "exploded" function 
calls that look like function bodies.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
 newEpoch + ", but we never renounced controller 
epoch " +
 curEpoch);
 }
-log.warn("Becoming active at controller epoch {}.", 
newEpoch);
+log.info(

Review comment:
   This can fit in two or three lines... let's try to avoid "exploded" 
function calls that look like function bodies.

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##
@@ -178,14 +178,18 @@ public Snapshot getSnapshot(long epoch) {
 /**
  * Creates a new snapshot at the given epoch.
  *
+ * If {@code epoch} already exists and it is the last snapshot then just 
return that snapshot.

Review comment:
   With this modification, the function should be renamed something like 
`getOrCreateSnapshot`, right?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -651,20 +661,21 @@ public void 
handleCommit(BatchReader reader) {
 // If we are writing a new snapshot, then we need 
to keep that around;
 // otherwise, we should delete up to the current 
committed offset.
 snapshotRegistry.deleteSnapshotsUpTo(
-Math.min(offset, 
snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+
snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
   newline not needed here

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -651,20 +661,21 @@ public void 
handleCommit(BatchReader reader) {
 // If we are writing a new snapshot, then we need 
to keep that around;
 // otherwise, we should delete up to the current 
committed offset.
 snapshotRegistry.deleteSnapshotsUpTo(
-Math.min(offset, 
snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+
snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
   extra newline not needed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11025: Scala 3 Compilation with 2.12 support

2021-07-20 Thread GitBox


jlprat commented on pull request #11025:
URL: https://github.com/apache/kafka/pull/11025#issuecomment-882487321


   I will close this PR as this is not really compiling in Scala3 as Gradle's 
scala plugin somehow still attempts to compile in Scala 2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] PhilHardwick commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-20 Thread GitBox


PhilHardwick commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672966032



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   Yeah agreed - that's a better idea, I've changed to a Map and being able 
to remove by thread name. To be honest, I'd like to just pass in StreamThread 
into the #addStoreProvider but then it's hard to test because the 
StreamThreadStateStoreProvider is instantiated inside #addStoreProvider rather 
than being able to inject a stub (this would also allow #removeStoreProvider to 
just be passed a StreamThread which would make it more consistent). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13106) Offsets deletion error

2021-07-20 Thread Robert Janda (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Janda resolved KAFKA-13106.
--
Resolution: Fixed

version 2.3.1 does not suport it

> Offsets deletion error
> --
>
> Key: KAFKA-13106
> URL: https://issues.apache.org/jira/browse/KAFKA-13106
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.1, 2.7.0
> Environment: Linux
>Reporter: Robert Janda
>Priority: Major
> Fix For: 2.7.0
>
>
> When I use:
> kafka-consumer-groups.sh --bootstrap-server broker:9092 --delete-offsets 
> --group myGroup --topic myTopic
>  
> I have a error:
>  
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/kafka/libs/slf4j-log4j12-1.7.26.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/kafka/libs/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See [http://www.slf4j.org/codes.html#multiple_bindings] for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" joptsimple.UnrecognizedOptionException: 
> delete-offsets is not a recognized option
>     at 
> joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
>     at 
> joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
>     at 
> joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
>     at joptsimple.OptionParser.parse(OptionParser.java:396)
>     at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupCommandOptions.(ConsumerGroupCommand.scala:917)
>     at 
> kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:46)
>     at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
>  
>  When I use kafka protocol base on:
>  [https://kafka.apache.org/protocol#The_Messages_OffsetDelete]
>  
>  I have kafka log:
>  
>  [2021-07-20 07:23:37,832] ERROR Closing socket for 
> 10.1.1.20:9092-192.168.65.3:56050-0 because of error (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Unknown API key 47
> [2021-07-20 07:23:37,835] ERROR Exception while processing request from 
> 10.1.1.20:9092-192.168.65.3:56050-0 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Unknown API key 47
>  
>  
>  Base on:
>  
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
>  
>  Those things should be working.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

2021-07-20 Thread GitBox


cadonna commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672999766



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
 }
 }
 
-private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) 
throws InterruptedException {
+TestUtils.waitForCondition( () -> kafkaStreams
+.metadataForLocalThreads()
+.stream()
+.mapToLong(t -> t.activeTasks().size())
+.sum() == 1, "only one task");

Review comment:
   Could you please add a more descriptive failure message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13107) KafkaServer.startup leaves server socket open if zk error is throws (ex. NodeExists)

2021-07-20 Thread Fuud (Jira)
Fuud created KAFKA-13107:


 Summary: KafkaServer.startup leaves server socket open if zk error 
is throws (ex. NodeExists)
 Key: KAFKA-13107
 URL: https://issues.apache.org/jira/browse/KAFKA-13107
 Project: Kafka
  Issue Type: Bug
Reporter: Fuud


kafka.network.Acceptor#serverChannel is not closed if KafkaServer.startup is 
failed with zk error.

Because the single point where serverChannel.close() is 
kafka/network/SocketServer.scala:640 but it requires Acceptor to be scheduled.

It is regression: in 2.5 server channel was closed as expected.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13107) KafkaServer.startup leaves server socket open if zk error is throws (ex. NodeExists)

2021-07-20 Thread Fuud (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fuud updated KAFKA-13107:
-
Description: 
kafka.network.Acceptor#serverChannel is not closed if KafkaServer.startup is 
failed with zk error.

Because the single point where serverChannel.close() is 
kafka/network/SocketServer.scala:640 but it requires Acceptor to be scheduled.

It is regression: in 2.5 server channel was closed as expected.

Reproduced in 2.8

 

  was:
kafka.network.Acceptor#serverChannel is not closed if KafkaServer.startup is 
failed with zk error.

Because the single point where serverChannel.close() is 
kafka/network/SocketServer.scala:640 but it requires Acceptor to be scheduled.

It is regression: in 2.5 server channel was closed as expected.

 


> KafkaServer.startup leaves server socket open if zk error is throws (ex. 
> NodeExists)
> 
>
> Key: KAFKA-13107
> URL: https://issues.apache.org/jira/browse/KAFKA-13107
> Project: Kafka
>  Issue Type: Bug
>Reporter: Fuud
>Priority: Major
>
> kafka.network.Acceptor#serverChannel is not closed if KafkaServer.startup is 
> failed with zk error.
> Because the single point where serverChannel.close() is 
> kafka/network/SocketServer.scala:640 but it requires Acceptor to be scheduled.
> It is regression: in 2.5 server channel was closed as expected.
> Reproduced in 2.8
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-20 Thread GitBox


ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   Ah, yeah I guess it would have always had to handle `DEAD` threads since 
in the before-time (ie before we added the `add/removeStreamThread` APIs) it 
was always possible for a thread to just die when hit with an unexpected 
exception.
   
   That said, I feel a lot better about trimming a removed thread from the list 
explicitly. Don't want to build up a mass grave of mostly dead threads (well, 
dead thread store providers) that can never be garbage collected over the 
application's lifetime

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   It seems a bit weird to have to create a `new 
StreamThreadStateStoreProvider(streamThread)` just to remove and existing 
`StreamThreadStateStoreProvider` from this -- can we maybe change it so that 
`#removeStoreProvider` accepts a `StreamThread` reference, or even just a 
`String streamThreadName`? And then store a map from name to 
`StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10978: MINOR: Use time constant algorithms when comparing passwords or keys

2021-07-20 Thread GitBox


rhauch commented on pull request #10978:
URL: https://github.com/apache/kafka/pull/10978#issuecomment-882731540


   Thanks for the review, folks. @omkreddy, I renamed the new utility method as 
you suggested, and updated the JavaDocs as well. Hopefully the purpose and 
behavior is more clear.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


dajac merged pull request #11016:
URL: https://github.com/apache/kafka/pull/11016


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


dajac commented on pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#issuecomment-882493385






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: retry for tasks

2021-07-20 Thread GitBox


wcarlson5 commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-882919484


   @ableegoldman @jlprat @cadonna Can I get a review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11075: MINOR: Move off deprecated APIs in StreamsResetter

2021-07-20 Thread GitBox


jolshan commented on a change in pull request #11075:
URL: https://github.com/apache/kafka/pull/11075#discussion_r672384414



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -655,7 +655,7 @@ public void doDelete(final List topicsToDelete,
  final Admin adminClient) {
 boolean hasDeleteErrors = false;
 final DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(topicsToDelete);
-final Map> results = 
deleteTopicsResult.values();
+final Map> results = 
deleteTopicsResult.topicNameValues();

Review comment:
   This is part of KIP-516

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -655,7 +655,7 @@ public void doDelete(final List topicsToDelete,
  final Admin adminClient) {
 boolean hasDeleteErrors = false;
 final DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(topicsToDelete);
-final Map> results = 
deleteTopicsResult.values();
+final Map> results = 
deleteTopicsResult.topicNameValues();

Review comment:
   I believe the jira was under the kip's main jira, but I can make the 
link clearer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11085: MINOR: reduce debug log spam and busy loop during shutdown

2021-07-20 Thread GitBox


ableegoldman commented on pull request #11085:
URL: https://github.com/apache/kafka/pull/11085#issuecomment-883003472


   call for review @wcarlson5 @lct45 @guozhangwang @vvcephei @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-20 Thread GitBox


showuon commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that 
it might be more eager than before, to fetch the offset. But, looks like we 
need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

2021-07-20 Thread GitBox


cadonna commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672999766



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
 }
 }
 
-private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) 
throws InterruptedException {
+TestUtils.waitForCondition( () -> kafkaStreams
+.metadataForLocalThreads()
+.stream()
+.mapToLong(t -> t.activeTasks().size())
+.sum() == 1, "only one task");

Review comment:
   Could you please add a more descriptive failure message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


rajinisivaram commented on a change in pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#discussion_r672241651



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -105,53 +118,96 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
+final Map partitionResults = new HashMap<>();
 
-Map partitions = new HashMap<>();
 for (OffsetCommitResponseTopic topic : response.data().topics()) {
 for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
 Errors error = Errors.forCode(partition.errorCode());
+
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleError(
+groupId,
+topicPartition,
+error,
+partitionResults,
+groupsToUnmap,
+groupsToRetry
+);
 } else {
-partitions.put(tp, error);
+partitionResults.put(topicPartition, error);
 }
 }
 }
-if (failed.isEmpty() && unmapped.isEmpty())
-completed.put(groupId, partitions);
 
-return new ApiResult<>(completed, failed, unmapped);
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+Collections.singletonMap(groupId, partitionResults),
+Collections.emptyMap(),
+Collections.emptyList()
+);
+} else {
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
+}
 }
 
 private void handleError(
 CoordinatorKey groupId,
+TopicPartition topicPartition,
 Errors error,
-Map failed,
-List unmapped
+Map partitionResults,
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
-case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-error.exception());
-failed.put(groupId, error.exception());
-break;
+// If the coordinator is in the middle of loading, then we just 
need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
+log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
+" is still in the process of loading state. Will retry.", 
groupId.idValue);
+groupsToRetry.add(groupId);
+break;
+
+// If the coordinator is not available, then we unmap and retry.
 case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
-log.debug("OffsetCommit request for group {} returned error 
{}. Will retry", groupId, error);
-unmapped.add(groupId);
+log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+groupId.idValue, error);
+groupsToUnmap.add(groupId);
 break;
+
+// Group level errors.
+case INVALID_GROUP_ID:
+case REBALANCE_IN_PROGRESS:
+case INVALID_COMMIT_OFFSET_SIZE:
+case GROUP_AUTHORIZATION_FAILED:
+log.debug("OffsetCommit request for group id {} failed due to 
error {}.",
+groupId.idValue, error);
+partitionResults.put(topicPartition, error);

Review comment:
   Yes, we can open a JIRA to do it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10884: MINOR: replace deprecated exactly_once_beta into exactly_once_v2

2021-07-20 Thread GitBox


showuon commented on pull request #10884:
URL: https://github.com/apache/kafka/pull/10884#issuecomment-882474964


   @cadonna , sorry, it turns out the version `LATEST_3_0` cannot be tested 
yet. Remove it to make it work. Thank you. 
   
   ```
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:   2021-07-19--014
   run time: 3 minutes 0.207 seconds
   tests run:4
   passed:   4
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.5.1
   status: PASS
   run time:   45.659 seconds
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.6.2
   status: PASS
   run time:   42.687 seconds
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.7.1
   status: PASS
   run time:   45.178 seconds
   

   test_id:
kafkatest.tests.streams.streams_broker_compatibility_test.StreamsBrokerCompatibility.test_compatible_brokers_eos_v2_enabled.broker_version=2.8.0
   status: PASS
   run time:   46.120 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11064: MINOR: enable reassign_partitions_test.py for kraft

2021-07-20 Thread GitBox


ijuma commented on pull request #11064:
URL: https://github.com/apache/kafka/pull/11064#issuecomment-882580369


   Thanks for the explanation @rondagostino.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-20 Thread GitBox


junrao merged pull request #10579:
URL: https://github.com/apache/kafka/pull/10579


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11080: fix NPE when record==null in append

2021-07-20 Thread GitBox


ccding commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-882800476


   This PR is ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672245627



##
File path: core/src/main/scala/kafka/log/LogConfig.scala
##
@@ -479,4 +501,31 @@ object LogConfig {
 logProps.put(MessageDownConversionEnableProp, 
kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
 logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: 
ApiVersion): Boolean =
+interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
   Yeah, I do agree with you. I don't think that anyone already issues 
`3.0-IV1` anyway.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
*  Start the background threads to flush logs and do log cleanup
*/
   def startup(topicNames: Set[String]): Unit = {
-startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+// ensure consistency between default config and overrides
+val defaultConfig = currentDefaultConfig
+startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): 
Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, 
topicNames: Set[String]): Map[String, LogConfig] = {
 val topicConfigOverrides = mutable.Map[String, LogConfig]()
-val defaultProps = currentDefaultConfig.originals()
+val defaultProps = defaultConfig.originals()
 topicNames.foreach { topicName =>
-  val overrides = configRepository.topicConfig(topicName)
+  var overrides = configRepository.topicConfig(topicName)

Review comment:
   For my own education, could `Properties` by unmodifiable somehow? 
   
   I pointed that out because the scaladoc of the `ConfigRepository` trait 
states the following `@return a copy of the configuration for the given 
resource`. It seems that the two main implementations respect this however 
`MockConfigRepository` does not. Therefore, I tend to agree with you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma merged pull request #11036:
URL: https://github.com/apache/kafka/pull/11036


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11067: MINOR: log broker configs in KRaft mode

2021-07-20 Thread GitBox


cmccabe merged pull request #11067:
URL: https://github.com/apache/kafka/pull/11067


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


dajac commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-883106256






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11078: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma merged pull request #11078:
URL: https://github.com/apache/kafka/pull/11078


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11067: MINOR: log broker configs in KRaft mode

2021-07-20 Thread GitBox


ijuma commented on a change in pull request #11067:
URL: https://github.com/apache/kafka/pull/11067#discussion_r672525835



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   Can we file a JIRA?

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   I also learned that the ZK path logs dynamic configs too now. So, we can 
have a JIRA about logging dynamic configs for KRaft and mention there the 
potential clean-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11064: MINOR: enable reassign_partitions_test.py for kraft

2021-07-20 Thread GitBox


cmccabe merged pull request #11064:
URL: https://github.com/apache/kafka/pull/11064


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11080: fix NPE when record==null in append

2021-07-20 Thread GitBox


ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672610426



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Is this really an exceptional case? Don't we do reads where we don't 
know exactly where the read ends and hence will trigger this path?

##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   I think the intent here was to cover the case where an incomplete record 
is returned by the broker. However, we have broker logic to try and avoid this 
case since KIP-74:
   
   ```java
   } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
   // For FetchRequest version 3, we replace incomplete message 
sets with an empty one as consumers can make
   // progress in such cases and don't need to report a 
`RecordTooLargeException`
   FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, 
MemoryRecords.EMPTY)
   ```
   
   @hachikuji Do you remember if there is still a reason to return `null` here 
instead of the exception @ccding is proposing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672253775



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
*  Start the background threads to flush logs and do log cleanup
*/
   def startup(topicNames: Set[String]): Unit = {
-startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+// ensure consistency between default config and overrides
+val defaultConfig = currentDefaultConfig
+startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): 
Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, 
topicNames: Set[String]): Map[String, LogConfig] = {
 val topicConfigOverrides = mutable.Map[String, LogConfig]()
-val defaultProps = currentDefaultConfig.originals()
+val defaultProps = defaultConfig.originals()
 topicNames.foreach { topicName =>
-  val overrides = configRepository.topicConfig(topicName)
+  var overrides = configRepository.topicConfig(topicName)

Review comment:
   Oh, interesting, so it does specify that a copy is returned. I had 
missed that. Since the mock config repository does not implement it correctly, 
as you found, I think it's better to handle this via a separate PR (it's 
possible that unrelated tests may fail as a result and may need fixing).

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
*  Start the background threads to flush logs and do log cleanup
*/
   def startup(topicNames: Set[String]): Unit = {
-startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+// ensure consistency between default config and overrides
+val defaultConfig = currentDefaultConfig
+startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): 
Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, 
topicNames: Set[String]): Map[String, LogConfig] = {
 val topicConfigOverrides = mutable.Map[String, LogConfig]()
-val defaultProps = currentDefaultConfig.originals()
+val defaultProps = defaultConfig.originals()
 topicNames.foreach { topicName =>
-  val overrides = configRepository.topicConfig(topicName)
+  var overrides = configRepository.topicConfig(topicName)

Review comment:
   Regarding unmodifiable `Properties`, there is no built-in way. My 
concern was something like the `MockConfigRepository` issue you found where 
copies are not done with the assumption that the caller should not mutate (even 
though there is no simple way to prevent that - another reason not to use 
`Properties`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


mumrah commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882589793


   Thanks @ijuma, I don't have any more questions or follow-ups 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

2021-07-20 Thread GitBox


dajac commented on a change in pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#discussion_r672231624



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -105,53 +118,96 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
+final Map partitionResults = new HashMap<>();
 
-Map partitions = new HashMap<>();
 for (OffsetCommitResponseTopic topic : response.data().topics()) {
 for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
 Errors error = Errors.forCode(partition.errorCode());
+
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleError(
+groupId,
+topicPartition,
+error,
+partitionResults,
+groupsToUnmap,
+groupsToRetry
+);
 } else {
-partitions.put(tp, error);
+partitionResults.put(topicPartition, error);
 }
 }
 }
-if (failed.isEmpty() && unmapped.isEmpty())
-completed.put(groupId, partitions);
 
-return new ApiResult<>(completed, failed, unmapped);
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+Collections.singletonMap(groupId, partitionResults),
+Collections.emptyMap(),
+Collections.emptyList()
+);
+} else {
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
+}
 }
 
 private void handleError(
 CoordinatorKey groupId,
+TopicPartition topicPartition,
 Errors error,
-Map failed,
-List unmapped
+Map partitionResults,
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
-case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-error.exception());
-failed.put(groupId, error.exception());
-break;
+// If the coordinator is in the middle of loading, then we just 
need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
+log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
+" is still in the process of loading state. Will retry.", 
groupId.idValue);
+groupsToRetry.add(groupId);
+break;
+
+// If the coordinator is not available, then we unmap and retry.
 case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
-log.debug("OffsetCommit request for group {} returned error 
{}. Will retry", groupId, error);
-unmapped.add(groupId);
+log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+groupId.idValue, error);
+groupsToUnmap.add(groupId);
 break;
+
+// Group level errors.
+case INVALID_GROUP_ID:
+case REBALANCE_IN_PROGRESS:
+case INVALID_COMMIT_OFFSET_SIZE:

Review comment:
   I think it is. It basically indicate that we could write the group 
metadata to the log so it concerns the group. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L448

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -105,53 +118,96 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap 

[GitHub] [kafka] mumrah commented on a change in pull request #11070: Validate the controllerListener config on startup

2021-07-20 Thread GitBox


mumrah commented on a change in pull request #11070:
URL: https://github.com/apache/kafka/pull/11070#discussion_r672407194



##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -137,8 +137,14 @@ class ControllerServer(
 credentialProvider,
 apiVersionManager)
   socketServer.startup(startProcessingRequests = false, 
controlPlaneListener = None, config.controllerListeners)
-  socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
-config.controllerListeners.head.listenerName))
+
+  if(config.controllerListeners.nonEmpty) {
+socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
+  config.controllerListeners.head.listenerName))
+  } else {
+fatal("No controllerListener defined for controller")
+throw new IllegalArgumentException()

Review comment:
   We have `ConfigException` for these such cases

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1914,6 +1914,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
+require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
+  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty")

Review comment:
   How about "[...] cannot be empty if the server has the controller role"

##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -137,8 +137,14 @@ class ControllerServer(
 credentialProvider,
 apiVersionManager)
   socketServer.startup(startProcessingRequests = false, 
controlPlaneListener = None, config.controllerListeners)
-  socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
-config.controllerListeners.head.listenerName))
+
+  if(config.controllerListeners.nonEmpty) {
+socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
+  config.controllerListeners.head.listenerName))
+  } else {
+fatal("No controllerListener defined for controller")

Review comment:
   Instead of "controllerListener" let's log the full property name

##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -137,8 +137,14 @@ class ControllerServer(
 credentialProvider,
 apiVersionManager)
   socketServer.startup(startProcessingRequests = false, 
controlPlaneListener = None, config.controllerListeners)
-  socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
-config.controllerListeners.head.listenerName))
+
+  if(config.controllerListeners.nonEmpty) {

Review comment:
   nit: need space after `if` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-07-20 Thread GitBox


satishd commented on pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#issuecomment-883064875


   @junrao This PR is rebased with trunk, please review and let me know your 
comments.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11067: MINOR: log broker configs in KRaft mode

2021-07-20 Thread GitBox


cmccabe commented on a change in pull request #11067:
URL: https://github.com/apache/kafka/pull/11067#discussion_r672512638



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   > 1. Where are we logging this for the zk case?
   It's logged from DynamicBrokerConfig.initialize (see above stack trace)
   
   > 2. Can we expose a method in KafkaConfig to write the configs to a logger 
or something? Seems a bit cleaner than this approach.
   Can we do this after 3.0? Due to the inheritance-based design here, there 
isn't a simple way to expose this without changing the base class. The comment 
does explain what the line does...

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   > 1. Where are we logging this for the zk case?
   
   It's logged from DynamicBrokerConfig.initialize (see above stack trace)
   
   > 2. Can we expose a method in KafkaConfig to write the configs to a logger 
or something? Seems a bit cleaner than this approach.
   
   Can we do this after 3.0? Due to the inheritance-based design here, there 
isn't a simple way to expose this without changing the base class. The comment 
does explain what the line does...

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   > 1. Where are we logging this for the zk case?
   
   In the ZK case, it's logged from DynamicBrokerConfig.initialize (see above 
stack trace)
   
   > 2. Can we expose a method in KafkaConfig to write the configs to a logger 
or something? Seems a bit cleaner than this approach.
   
   Can we do this after 3.0? Due to the inheritance-based design here, there 
isn't a simple way to expose this without changing the base class. The comment 
does explain what the line does...

##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -389,6 +389,9 @@ class BrokerServer(
   // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
   metadataListener.startPublishing(metadataPublisher).get()
 
+  // Log static broker configurations.
+  new KafkaConfig(config.originals(), true)

Review comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-13105




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat closed pull request #11025: Scala 3 Compilation with 2.12 support

2021-07-20 Thread GitBox


jlprat closed pull request #11025:
URL: https://github.com/apache/kafka/pull/11025


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11080: fix NPE when record==null in append

2021-07-20 Thread GitBox


ccding commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r672758944



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Are you saying the case that we are yet to complete reading the request? 
I didn't see a retry path, but it will cause a null point exception at 
https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest me do here?

##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Are you saying the case that we are yet to complete reading the request? 
I didn't see a retry path, but it will cause a null point exception at 
https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191
   
   What do you suggest I do here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-20 Thread GitBox


showuon edited a comment on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-882498141


   After test, **I confirmed that this fix can resolve the issue**. Just that 
it might be a little more eager than before, to fetch the offset. But, looks 
like we need those "fetch" to fix this stream stuck issue. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11078: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

2021-07-20 Thread GitBox


ijuma commented on pull request #11078:
URL: https://github.com/apache/kafka/pull/11078#issuecomment-882544244






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10811: KAFKA-12598: ConfigCommand should only support communication via ZooKeeper for a reduced set of cases

2021-07-20 Thread GitBox


ijuma merged pull request #10811:
URL: https://github.com/apache/kafka/pull/10811


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-20 Thread GitBox


showuon commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-883022752






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoeCqupt closed pull request #11088: MINOR: remove unnecessary judgment in method: assignReplicasToBrokersRackAware

2021-07-20 Thread GitBox


JoeCqupt closed pull request #11088:
URL: https://github.com/apache/kafka/pull/11088






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

2021-07-20 Thread GitBox


dielhennr commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r672693205



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -42,23 +42,38 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static 
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
 
 
 public class ConfigurationControlManager {
+static final ConfigResource DEFAULT_NODE_RESOURCE = new 
ConfigResource(Type.BROKER, "");
+
 private final Logger log;
+private final int nodeId;
+private final ConfigResource currentNodeResource;
 private final SnapshotRegistry snapshotRegistry;
 private final Map configDefs;
+private final TimelineHashMap emptyMap;

Review comment:
   I don't think it can be. It needs to be a TimelineHashMap to work and 
needs to receive the snapshot registry in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-07-20 Thread GitBox


ijuma commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-882984058






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >