[GitHub] [kafka] dajac commented on a change in pull request #10234: MINOR; Clean up LeaderAndIsrResponse construction in `ReplicaManager#becomeLeaderOrFollower`

2021-03-02 Thread GitBox


dajac commented on a change in pull request #10234:
URL: https://github.com/apache/kafka/pull/10234#discussion_r586188217



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##
@@ -145,24 +145,22 @@ public LeaderAndIsrResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 .setErrorCode(error.code()));
 }
 responseData.setPartitionErrors(partitions);
-return new LeaderAndIsrResponse(responseData, version());
-}
-
-List topics = new 
ArrayList<>(data.topicStates().size());
-Map topicIds = topicIds();
-for (LeaderAndIsrTopicState topicState : data.topicStates()) {
-LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
-topicError.setTopicId(topicIds.get(topicState.topicName()));
-List partitions = new 
ArrayList<>(topicState.partitionStates().size());
-for (LeaderAndIsrPartitionState partition : 
topicState.partitionStates()) {
-partitions.add(new LeaderAndIsrPartitionError()
+} else {
+Map topicIds = topicIds();
+for (LeaderAndIsrTopicState topicState : data.topicStates()) {
+List partitions = new ArrayList<>(
+topicState.partitionStates().size());
+for (LeaderAndIsrPartitionState partition : 
topicState.partitionStates()) {
+partitions.add(new LeaderAndIsrPartitionError()
 .setPartitionIndex(partition.partitionIndex())
 .setErrorCode(error.code()));
+}
+responseData.topics().add(new LeaderAndIsrTopicError()
+.setTopicId(topicIds.get(topicState.topicName()))

Review comment:
   Good catch. I missed this one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586131042



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+  request.context.apiVersion,
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator)).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+// Check if topic deletion is enabled at all.
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+// The first step is to load up the names and IDs that have been provided 
by the
+// request.  This is a bit messy because we support multiple ways of 
referring to
+// topics (both by name and by id) and because we need to check for 
duplicates or
+// other invalid inputs.
+val responses = new util.ArrayList[DeletableTopicResult]
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error.code).
+setErrorMessage(error.message))
+}
+val providedNames = new util.HashSet[String]
+val duplicateProvidedNames = new util.HashSet[String]
+val providedIds = new util.HashSet[Uuid]
+val duplicateProvidedIds = new util.HashSet[Uuid]
+def addProvidedName(name: String): Unit = {
+  if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+duplicateProvidedNames.add(name)
+providedNames.remove(name)
+  }
+}
+request.topicNames.forEach(addProvidedName)
+request.topics.forEach {
+  topic => if (topic.name == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else if (duplicateProvidedIds.contains(topic.topicId) || 
!providedIds.add(topic.topicId)) {
+  duplicateProvidedIds.add(topic.topicId)
+  providedIds.remove(topic.topicId)
+}
+  } else {
+if (topic.topicId.equals(ZERO_UUID)) {
+  addProvidedName(topic.name)
+} else {
+  appendResponse(topic.name, topic.topicId, new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+// Create error responses for duplicates.
+duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+  new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+duplicateProvidedIds.forEach(id => appendResponse(null, id,
+  new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+// At this point we have all the valid names and IDs that have been 
provided.
+// However, the Authorizer needs topic names as inputs, not topic IDs.  So
+// we need to resolve all IDs to names.
+val toAuthenticate = new util.HashSet[String]
+toAuthenticate.addAll(providedNames)
+val idToName = new util.HashMap[Uuid, String]
+controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+  if (nameOrError.isError) {
+appendResponse(null, id, nameOrError.error())
+  } else {
+toAuthenticate.add(nameOrError.result())
+idToName.put(id, nameOrError.result())
+  }
+}
+// Get the list of deletable topics (those we can delete) and the list of 
describeable
+// topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
+// exist, even when it does.
+val topicsToAuthenticate = toAuthenticate.asScala
+val (describeable, deletable) = if (hasClusterAuth) {
+  (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+} else {
+  

[GitHub] [kafka] satishd commented on pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-02 Thread GitBox


satishd commented on pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#issuecomment-789442937


   @junrao thanks for the comment, added all public classes to javadoc section 
in build.gradle. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-02 Thread GitBox


satishd commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r586119132



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface provides storing and fetching remote log segment metadata 
with strongly consistent semantics.
+ * 
+ * This class can be plugged in to Kafka cluster by adding the implementation 
class as
+ * remote.log.metadata.manager.class.name property value. There 
is an inbuilt implementation backed by
+ * topic storage in the local cluster. This is used as the default 
implementation if
+ * remote.log.metadata.manager.class.name is not configured.
+ * 
+ * 
+ * remote.log.metadata.manager.class.path property is about the 
class path of the RemoteLogStorageManager
+ * implementation. If specified, the RemoteLogStorageManager implementation 
and its dependent libraries will be loaded
+ * by a dedicated classloader which searches this class path before the Kafka 
broker class path. The syntax of this
+ * parameter is same with the standard Java class path string.
+ * 
+ * 
+ * remote.log.metadata.manager.listener.name property is about 
listener name of the local broker to which
+ * it should get connected if needed by RemoteLogMetadataManager 
implementation. When this is configured all other
+ * required properties can be passed as properties with prefix of 
'remote.log.metadata.manager.listener.
+ * 
+ * "cluster.id", "broker.id" and all other properties prefixed with 
"remote.log.metadata." are passed when
+ * {@link #configure(Map)} is invoked on this instance.
+ * 
+ */
+@InterfaceStability.Evolving
+public interface RemoteLogMetadataManager extends Configurable, Closeable {

Review comment:
   This method is renamed to `putRemotePartitionDeleteMetadata`, which was 
added in the current interface. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294278#comment-17294278
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-10340 at 3/3/21, 4:58 AM:
-

Temporarily removed 2.6.2 from the Fix Version so I can proceed with the 2.6.2 
release, since it's already been cherrypicked to 2.6 This should be added back 
once the ticket is resolved


was (Author: ableegoldman):
Temporarily removed 2.6.2 from the Fix Version so I can proceed with the 2.6.2 
release, this should be added back once the ticket is resolved

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

2021-03-02 Thread GitBox


hachikuji opened a new pull request #10252:
URL: https://github.com/apache/kafka/pull/10252


   This patch implements additional handling logic for `RemoveTopic` records:
   
   - Update `MetadataPartitions` to ensure addition of deleted partitions to 
`localRemoved` set
   - Ensure topic configs are removed from `ConfigRepository`
   - Propagate deleted partitions to `GroupCoordinator` so that corresponding 
offset commits can be removed
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294278#comment-17294278
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10340:


Temporarily removed 2.6.2 from the Fix Version so I can proceed with the 2.6.2 
release, this should be added back once the ticket is resolved

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-02 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10340:
---
Fix Version/s: (was: 2.6.2)

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12400) Upgrade jetty to fix CVE-2020-27223

2021-03-02 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-12400.
---
Resolution: Fixed

Issue resolved by pull request 10245
[https://github.com/apache/kafka/pull/10245]

> Upgrade jetty to fix CVE-2020-27223
> ---
>
> Key: KAFKA-12400
> URL: https://issues.apache.org/jira/browse/KAFKA-12400
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
> Fix For: 2.7.1, 2.6.2, 2.8.0
>
>
> h3. CVE-2020-27223 Detail
> In Eclipse Jetty 9.4.6.v20170531 to 9.4.36.v20210114 (inclusive), 10.0.0, and 
> 11.0.0 when Jetty handles a request containing multiple Accept headers with a 
> large number of quality (i.e. q) parameters, the server may enter a denial of 
> service (DoS) state due to high CPU usage processing those quality values, 
> resulting in minutes of CPU time exhausted processing those quality values.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy closed pull request #10245: KAFKA-12400: Upgrade jetty to fix CVE-2020-27223

2021-03-02 Thread GitBox


omkreddy closed pull request #10245:
URL: https://github.com/apache/kafka/pull/10245


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10250: MINOR: Fix null exception in coordinator log

2021-03-02 Thread GitBox


chia7712 merged pull request #10250:
URL: https://github.com/apache/kafka/pull/10250


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10250: MINOR: Fix null exception in coordinator log

2021-03-02 Thread GitBox


chia7712 commented on pull request #10250:
URL: https://github.com/apache/kafka/pull/10250#issuecomment-789420972


   >  do you want to merge this? We should also cherrypick it all the way back 
to 2.6
   
   Sure. will merge and backport this patch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10250: MINOR: Fix null exception in coordinator log

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10250:
URL: https://github.com/apache/kafka/pull/10250#issuecomment-789420651


   @chia7712 do you want to merge this? We should also cherrypick it all the 
way back to 2.6 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10250: MINOR: Fix null exception in coordinator log

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10250:
URL: https://github.com/apache/kafka/pull/10250#issuecomment-789420529


   One unrelated test failure which is known to be flaky: 
`TransactionsBounceTest.testWithGroupMetadata`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10251: MINOR: add missing docs for record-e2e-latency metrics

2021-03-02 Thread GitBox


ableegoldman commented on a change in pull request #10251:
URL: https://github.com/apache/kafka/pull/10251#discussion_r586100402



##
File path: docs/ops.html
##
@@ -2356,6 +2356,26 @@ 

[jira] [Resolved] (KAFKA-12389) Upgrade of netty-codec due to CVE-2021-21290

2021-03-02 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-12389.
---
Fix Version/s: 2.8.0
   2.6.2
   2.7.1
   Resolution: Fixed

Issue resolved by pull request 10235
[https://github.com/apache/kafka/pull/10235]

> Upgrade of netty-codec due to CVE-2021-21290
> 
>
> Key: KAFKA-12389
> URL: https://issues.apache.org/jira/browse/KAFKA-12389
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.7.0
>Reporter: Dominique Mongelli
>Assignee: Dongjin Lee
>Priority: Major
> Fix For: 2.7.1, 2.6.2, 2.8.0
>
>
> Our security tool raised the following security flaw on kafka 2.7: 
> [https://nvd.nist.gov/vuln/detail/CVE-2021-21290]
> It is a vulnerability related to jar *netty-codec-4.1.51.Final.jar*.
> Looking at source code, the netty-codec in trunk and 2.7.0 branches are still 
> vulnerable.
> Based on netty issue tracker, the vulnerability is fixed in 4.1.59.Final: 
> https://github.com/netty/netty/security/advisories/GHSA-5mcr-gq6c-3hq2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on pull request #10235: KAFKA-12389: Upgrade of netty-codec due to CVE-2021-21290

2021-03-02 Thread GitBox


dongjinleekr commented on pull request #10235:
URL: https://github.com/apache/kafka/pull/10235#issuecomment-789416305


   @ableegoldman FYI. :smiley: 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10251: MINOR: add missing docs for record-e2e-latency metrics

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10251:
URL: https://github.com/apache/kafka/pull/10251#issuecomment-789415834


   call for review @cadonna @vvcephei @guozhangwang -- in theory we should 
cherrypick this back to 2.8 at least, but I can prepare a separate PR against 
`kafka-site` if we don't get this merged in time (not worth blocking 2.8 over 
obviously)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10251: MINOR: add missing docs for record-e2e-latency metrics

2021-03-02 Thread GitBox


ableegoldman opened a new pull request #10251:
URL: https://github.com/apache/kafka/pull/10251


   I missed updating the documentation for these metrics since I didn't notice 
we had Streams metrics docs outside of the usual Streams docs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #10235: KAFKA-12389: Upgrade of netty-codec due to CVE-2021-21290

2021-03-02 Thread GitBox


omkreddy closed pull request #10235:
URL: https://github.com/apache/kafka/pull/10235


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10193: MINOR: correct the error message of validating uint32

2021-03-02 Thread GitBox


chia7712 commented on a change in pull request #10193:
URL: https://github.com/apache/kafka/pull/10193#discussion_r586082468



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
##
@@ -320,7 +320,7 @@ public Long validate(Object item) {
 if (item instanceof Long)
 return (Long) item;
 else
-throw new SchemaException(item + " is not a Long.");
+throw new SchemaException(item + " is not an a Long (encoding 
an unsigned integer).");

Review comment:
   > There is a typo here, "an a".
   
   My bad :(
   
   > Also, I think it reads a bit weird. Not clear that "encoding an unsigned 
integer" in brackets means when reading the message.
   
   I will update it in #10248 @ijuma thanks for your reviews





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #10240: KAFKA-12381: only return leader not available for internal topic creation

2021-03-02 Thread GitBox


abbccdda commented on pull request #10240:
URL: https://github.com/apache/kafka/pull/10240#issuecomment-789380164


   After offline sync with @hachikuji , we decided that the invalid replication 
factor check would be redundant to be performed on the forwarding broker. Will 
remove that logic to ensure we don't accidentally return any wrong error code 
to the client, due to the staleness of metadata cache.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10250: MINOR: Fix null exception in coordinator log

2021-03-02 Thread GitBox


dengziming commented on pull request #10250:
URL: https://github.com/apache/kafka/pull/10250#issuecomment-789374413


   @ableegoldman PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10250: MINOR: Fix null exception in coordinator log

2021-03-02 Thread GitBox


dengziming opened a new pull request #10250:
URL: https://github.com/apache/kafka/pull/10250


   *More detailed description of your change*
   Found that the `fatalException` is always null when calling `log.info("xxx", 
fatalException)`, maybe we should first assign a value to it.
   
   
   *Summary of testing strategy (including rationale)*
   Test locally.
   
   from
   ```
   [2021-03-03 10:18:06,203] INFO FindCoordinator request hit fatal exception 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator:260)
   ```
   to
   
   ```
   [2021-03-03 10:17:37,123] INFO FindCoordinator request hit fatal exception 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator:260)
   org.apache.kafka.common.errors.AuthenticationException: Authentication failed
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming closed pull request #10247: MINOR: Fix log format in AbstractCoordinator

2021-03-02 Thread GitBox


dengziming closed pull request #10247:
URL: https://github.com/apache/kafka/pull/10247


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10247: MINOR: Fix log format in AbstractCoordinator

2021-03-02 Thread GitBox


dengziming commented on a change in pull request #10247:
URL: https://github.com/apache/kafka/pull/10247#discussion_r586046025



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -860,7 +860,7 @@ public void onSuccess(ClientResponse resp, 
RequestFuture future) {
 
 @Override
 public void onFailure(RuntimeException e, RequestFuture future) {
-log.debug("FindCoordinator request failed due to {}", e);
+log.debug("FindCoordinator request failed due to {}", 
e.getMessage());

Review comment:
   Thank you, the #10232 LGTM, will close this pr.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-03-02 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294210#comment-17294210
 ] 

Luke Chen commented on KAFKA-10251:
---

Interesting. Let me check it. Thanks.

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


rondagostino commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789350896


   ```
   [2021-03-02T22:42:17.438Z] [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10184/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:76:8:
 Unused import - java.util.Set. [UnusedImports]
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10245: KAFKA-12400: Upgrade jetty to fix CVE-2020-27223

2021-03-02 Thread GitBox


dongjinleekr commented on pull request #10245:
URL: https://github.com/apache/kafka/pull/10245#issuecomment-789347793


   @ableegoldman I think this should be a blocker, since it is security 
vulnerability.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#issuecomment-789326289


   Merged to trunk and cherrypicked to 2.8 cc @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-02 Thread GitBox


ableegoldman merged pull request #10215:
URL: https://github.com/apache/kafka/pull/10215


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#issuecomment-789323390


   One unrelated failure `TransactionsBounceTest.testWithGroupId() ` (known to 
be flaky, see KAFKA-10251)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-03-02 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-10251:


Looks like it's still failing – saw at least one test failure on a build which 
was kicked off since merging this PR. 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10215/5/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8___testWithGroupId__/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed]

 

Reopening the ticket for further investigation

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-03-02 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-12254:
---
Fix Version/s: (was: 3.0.0)

> MirrorMaker 2.0 creates destination topic with default configs
> --
>
> Key: KAFKA-12254
> URL: https://issues.apache.org/jira/browse/KAFKA-12254
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
> Fix For: 2.8.0
>
>
> `MirrorSourceConnector` implements the logic for replicating data, 
> configurations, and other metadata between the source and destination 
> clusters. This includes the tasks below:
>  # `refreshTopicPartitions` for syncing topics / partitions from source to 
> destination.
>  # `syncTopicConfigs` for syncing topic configurations from source to 
> destination.
> A limitation is that `computeAndCreateTopicPartitions` creates topics with 
> default configurations on the destination cluster. A separate async task 
> `syncTopicConfigs` is responsible for syncing the topic configs. Before that 
> sync happens, topic configurations could be out of sync between the two 
> clusters.
> In the worst case, this could lead to data loss eg. when we have a compacted 
> topic being mirrored between clusters which is incorrectly created with the 
> default configuration of `cleanup.policy = delete` on the destination before 
> the configurations are sync'd via `syncTopicConfigs`.
> Here is an example of the divergence:
> Source Topic:
> ```
> Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,segment.bytes=1073741824
> ```
> Destination Topic:
> ```
> Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> segment.bytes=1073741824
> ```
> A safer approach is to ensure that the right configurations are set on the 
> destination cluster before data is replicated to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10245: KAFKA-12400: Upgrade jetty to fix CVE-2020-27223

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10245:
URL: https://github.com/apache/kafka/pull/10245#issuecomment-789260883


   What's the status here? Is this a blocker for the 2.6.2 release?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


cmccabe commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789256987


   Thanks for the reviews! I reworked the authentication, validation, and 
de-duplication code a lot.  The new logic should take into account the issues 
pointed out here. I resolved a few comment threads since they refer to code 
that was refactored-- please take another look if you get a chance.
   
   To clarify a bit, `RemoveTopicRecord` should imply some other effects:
   * All topic configs for the affected topic should be deleted
   * We should delete all the partitions of the deleted topic
   * We should remove the topic from `brokersToIsrs`
   
   The fact that it wasn't doing these things was a bug... it's fixed now.  
This should also allow the ducktape test to work (cc @rondagostino )
   
   We also have a JIRA to follow up on the broker side: 
https://issues.apache.org/jira/browse/KAFKA-12403



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


hachikuji commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789254543


   @rondagostino I believe that error will be fixed by 
https://issues.apache.org/jira/browse/KAFKA-12403.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585942960



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -809,6 +824,27 @@ private QuorumController(LogContext logContext,
 () -> replicationControl.unregisterBroker(brokerId));
 }
 
+@Override
+public CompletableFuture>> 
findTopicIds(Collection names) {
+if (names.isEmpty()) return 
CompletableFuture.completedFuture(Collections.emptyMap());
+return appendReadEvent("findTopicIds",
+() -> replicationControl.findTopicIds(lastCommittedOffset, names));
+}
+
+@Override
+public CompletableFuture>> 
findTopicNames(Collection ids) {
+if (ids.isEmpty()) return 
CompletableFuture.completedFuture(Collections.emptyMap());
+return appendReadEvent("findTopicNames",
+() -> replicationControl.findTopicNames(lastCommittedOffset, ids));
+}
+
+@Override
+public CompletableFuture> 
deleteTopics(Collection ids) {
+if (ids.isEmpty()) return 
CompletableFuture.completedFuture(Collections.emptyMap());
+return appendWriteEvent("deleteTopics",
+() -> replicationControl.deleteTopics(ids));

Review comment:
   Yes, it should be implicit based on the DeleteTopic record. I will fix 
the controller to do the right thing here.  We'll also need to have the broker 
do that too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585942730



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);

Review comment:
   Sorry, you're right: we need to remove this from `brokersToIsrs`.  Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3988) KafkaConfigBackingStore assumes configs will be stored as schemaless maps

2021-03-02 Thread Ewen Cheslack-Postava (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294009#comment-17294009
 ] 

Ewen Cheslack-Postava commented on KAFKA-3988:
--

[~pachima...@gmail.com] It's marked WONTFIX because we concluded it didn't make 
much sense to – as mentioned above, KIP-174 gets rid of the ability to even 
configure the internal converters. You should remove the internal.converter 
settings that specify schemas.enable=true.

> KafkaConfigBackingStore assumes configs will be stored as schemaless maps
> -
>
> Key: KAFKA-3988
> URL: https://issues.apache.org/jira/browse/KAFKA-3988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Major
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> If you use an internal key/value converter that drops schema information (as 
> is the default in the config files we provide since we use JsonConverter with 
> schemas.enable=false), the schemas we use that are structs get converted to 
> maps since we don't know the structure to decode them to. Because our tests 
> run with these settings, we haven't validated that the code works if schemas 
> are preserved.
> When they are preserved, we'll hit an error message like this
> {quote}
> [2016-07-25 07:36:34,828] ERROR Found connector configuration 
> (connector-test-mysql-jdbc) in wrong format: class 
> org.apache.kafka.connect.data.Struct 
> (org.apache.kafka.connect.storage.KafkaConfigBackingStore:498)
> {quote}
> because the code currently checks that it is working with a map. We should 
> actually be checking for either a Struct or a Map. This same problem probably 
> affects a couple of other types of data in the same class as Connector 
> configs, Task configs, Connect task lists, and target states are all Structs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10173: KAFKA-9548 Added SPIs and public classes/interfaces introduced in KIP-405 for tiered storage feature in Kafka.

2021-03-02 Thread GitBox


junrao commented on a change in pull request #10173:
URL: https://github.com/apache/kafka/pull/10173#discussion_r585934097



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface provides storing and fetching remote log segment metadata 
with strongly consistent semantics.
+ * 
+ * This class can be plugged in to Kafka cluster by adding the implementation 
class as
+ * remote.log.metadata.manager.class.name property value. There 
is an inbuilt implementation backed by
+ * topic storage in the local cluster. This is used as the default 
implementation if
+ * remote.log.metadata.manager.class.name is not configured.
+ * 
+ * 
+ * remote.log.metadata.manager.class.path property is about the 
class path of the RemoteLogStorageManager
+ * implementation. If specified, the RemoteLogStorageManager implementation 
and its dependent libraries will be loaded
+ * by a dedicated classloader which searches this class path before the Kafka 
broker class path. The syntax of this
+ * parameter is same with the standard Java class path string.
+ * 
+ * 
+ * remote.log.metadata.manager.listener.name property is about 
listener name of the local broker to which
+ * it should get connected if needed by RemoteLogMetadataManager 
implementation. When this is configured all other
+ * required properties can be passed as properties with prefix of 
'remote.log.metadata.manager.listener.
+ * 
+ * "cluster.id", "broker.id" and all other properties prefixed with 
"remote.log.metadata." are passed when
+ * {@link #configure(Map)} is invoked on this instance.
+ * 
+ */
+@InterfaceStability.Evolving
+public interface RemoteLogMetadataManager extends Configurable, Closeable {

Review comment:
   The KIP has the following method and is missing in the PR.
   
   `void updateRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)`

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.io.Closeable;
+import java.io.InputStream;
+
+/**
+ * This interface provides the lifecycle of remote log segments that includes 
copy, fetch, and delete from remote
+ * storage.
+ * 
+ * Each upload or copy of a segment is initiated with {@link 
RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
+ * which is universally unique even for the same topic partition and offsets.
+ * 
+ * {@link RemoteLogSegmentMetadata} is stored in {@link 
RemoteLogMetadataManager} before and after copy/delete operations on
+ * {@link RemoteStorageManager} with the respective {@link 
RemoteLogSegmentState}. {@link RemoteLogMetadataManager} is
+ * responsible for storing and fetching metadata about the remote 

[GitHub] [kafka] rondagostino commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


rondagostino commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789243745


   The below, if added as `tests/kafkatest/sanity_checks/test_delete_topic.py`, 
fails for the Raft cases on this PR branch as of this moment because the broker 
fails to shutdown.  The following appears in the controller log:
   
   ```
   [2021-03-02 21:41:13,354] INFO [Controller 1] Unfenced broker 1 has 
requested and been granted a controlled shutdown. 
(org.apache.kafka.controller.BrokerHeartbeatManager)
   [2021-03-02 21:41:13,355] WARN [Controller 1] 
org.apache.kafka.controller.QuorumController@3fa533f1: failed with unknown 
server exception RuntimeException at epoch 1 in 802 us.  Reverting to last 
committed offset 5. (org.apache.kafka.controller.QuorumController)
   java.lang.RuntimeException: Topic ID VnD54LHq2t3qq_m1WLasZg existed in 
isrMembers, but not in the topics map.
at 
org.apache.kafka.controller.ReplicationControlManager.handleNodeDeactivated(ReplicationControlManager.java:752)
at 
org.apache.kafka.controller.ReplicationControlManager.processBrokerHeartbeat(ReplicationControlManager.java:931)
at 
org.apache.kafka.controller.QuorumController$1.generateRecordsAndResult(QuorumController.java:911)
at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:419)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
at java.lang.Thread.run(Thread.java:748)
   ```
   
   Maybe add this system test to this PR as 
`tests/kafkatest/sanity_checks/test_delete_topic.py`?
   
   ```
   # Licensed to the Apache Software Foundation (ASF) under one or more
   # contributor license agreements.  See the NOTICE file distributed with
   # this work for additional information regarding copyright ownership.
   # The ASF licenses this file to You under the Apache License, Version 2.0
   # (the "License"); you may not use this file except in compliance with
   # the License.  You may obtain a copy of the License at
   #
   #http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing, software
   # distributed under the License is distributed on an "AS IS" BASIS,
   # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   # See the License for the specific language governing permissions and
   # limitations under the License.
   
   
   from ducktape.mark import matrix
   from ducktape.mark.resource import cluster
   from ducktape.tests.test import Test
   
   from kafkatest.services.kafka import KafkaService, quorum
   from kafkatest.services.zookeeper import ZookeeperService
   import time
   
   class TestDeleteTopic(Test):
   """Sanity checks that we can create and delete a topic and then 
shutdown."""
   def __init__(self, test_context):
   super(TestDeleteTopic, self).__init__(test_context)
   
   self.topic = "test_topic"
   self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
   self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
 topics={self.topic: {"partitions": 1, 
"replication-factor": 1}},
 controller_num_nodes_override=1)
   def setUp(self):
   if self.zk:
   self.zk.start()
   
   @cluster(num_nodes=2)
   @matrix(metadata_quorum=quorum.all)
   def test_delete_topic(self, metadata_quorum):
   """
   Test that we can create and delete a topic and then shutdown
   """
   self.kafka.start()
   self.kafka.delete_topic(self.topic)
   time.sleep(10) # give it a bit to take effect
   self.kafka.stop() # explicit stop so that failure to stop fails the 
test
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3988) KafkaConfigBackingStore assumes configs will be stored as schemaless maps

2021-03-02 Thread Laxman Pachimadla (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17294004#comment-17294004
 ] 

Laxman Pachimadla commented on KAFKA-3988:
--

[~ewencp] Hi Ewen, do you have any latest update on this. We have posted issue 
recently [https://github.com/awslabs/kinesis-kafka-connector/issues/46.] Please 
look in this and provide some solution 

> KafkaConfigBackingStore assumes configs will be stored as schemaless maps
> -
>
> Key: KAFKA-3988
> URL: https://issues.apache.org/jira/browse/KAFKA-3988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Major
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> If you use an internal key/value converter that drops schema information (as 
> is the default in the config files we provide since we use JsonConverter with 
> schemas.enable=false), the schemas we use that are structs get converted to 
> maps since we don't know the structure to decode them to. Because our tests 
> run with these settings, we haven't validated that the code works if schemas 
> are preserved.
> When they are preserved, we'll hit an error message like this
> {quote}
> [2016-07-25 07:36:34,828] ERROR Found connector configuration 
> (connector-test-mysql-jdbc) in wrong format: class 
> org.apache.kafka.connect.data.Struct 
> (org.apache.kafka.connect.storage.KafkaConfigBackingStore:498)
> {quote}
> because the code currently checks that it is working with a map. We should 
> actually be checking for either a Struct or a Map. This same problem probably 
> affects a couple of other types of data in the same class as Connector 
> configs, Task configs, Connect task lists, and target states are all Structs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-02 Thread GitBox


rondagostino commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r585924508



##
File path: KIP-500.md
##
@@ -0,0 +1,131 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits-- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0 without 
downtime.  There may be bugs, including serious ones.  You should *assume that 
your data could be lost at any time* if you try the early access release of 
KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/raft-combined.properties

Review comment:
   > Do we even need a prefix at all though? We already have 
server.properties=ZK, adding new broker.properties, controller.properties and 
combined.properties (or controller+broker.properties) seems to me to be 
self-describing and both backwards and forwards compatible
   
   I tend to agree with this suggestion.  I think using 
`{broker,controller,combined}.properties` for names would be a good way to go 
here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12377) Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener

2021-03-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293992#comment-17293992
 ] 

Matthias J. Sax commented on KAFKA-12377:
-

Failed again.

> Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener
> 
>
> Key: KAFKA-12377
> URL: https://issues.apache.org/jira/browse/KAFKA-12377
> Project: Kafka
>  Issue Type: Test
>  Components: core, security, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}org.opentest4j.AssertionFailedError: expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckClientConnectionFailure(SaslAuthenticatorTest.java:2187)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckSslAuthenticationFailure(SaslAuthenticatorTest.java:2210)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.verifySslClientAuthForSaslSslListener(SaslAuthenticatorTest.java:1846)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testSslClientAuthRequiredForSaslSslListener(SaslAuthenticatorTest.java:1800){quote}
> STDOUT
> {quote}[2021-02-26 07:18:57,220] ERROR Extensions provided in login context 
> without a token 
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) 
> java.io.IOException: Extensions provided in login context without a token at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
>  at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
>  at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
> [...]
> Caused by: 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException:
>  Extensions provided in login context without a token at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:192)
>  at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163)
>  ... 116 more{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12177) Retention is not idempotent

2021-03-02 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12177.
-
Fix Version/s: 3.0.0
 Assignee: Lucas Bradstreet
   Resolution: Fixed

merged the PR to trunk

> Retention is not idempotent
> ---
>
> Key: KAFKA-12177
> URL: https://issues.apache.org/jira/browse/KAFKA-12177
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Bradstreet
>Assignee: Lucas Bradstreet
>Priority: Minor
> Fix For: 3.0.0
>
>
> Kafka today applies retention in the following order:
>  # Time
>  # Size
>  # Log start offset
> Today it is possible for a segment with offsets less than the log start 
> offset to contain data that is not deletable due to time retention. This 
> means that it's possible for log start offset retention to unblock further 
> deletions as a result of time based retention. Note that this does require a 
> case where the max timestamp for each segment increases, decreases and then 
> increases again. Even so it would be nice to make retention idempotent by 
> applying log start offset retention first, followed by size and time. This 
> would also be potentially cheaper to perform as neither log start offset and 
> size retention require the maxTimestamp for a segment to be loaded from disk 
> after a broker restart.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10216: KAFKA-12177: apply log start offset retention before time and size based retention

2021-03-02 Thread GitBox


junrao merged pull request #10216:
URL: https://github.com/apache/kafka/pull/10216


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12238) Implement DescribeProducers API

2021-03-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12238:

Fix Version/s: 2.8.0

> Implement DescribeProducers API
> ---
>
> Key: KAFKA-12238
> URL: https://issues.apache.org/jira/browse/KAFKA-12238
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.8.0
>
>
> Implement the changes described in KIP-664 for the `DescribeProducers` API. 
> This is only the server-side implementation and not the changes to `Admin`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12267) Implement DescribeTransactions API

2021-03-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12267:

Fix Version/s: 3.0.0

> Implement DescribeTransactions API
> --
>
> Key: KAFKA-12267
> URL: https://issues.apache.org/jira/browse/KAFKA-12267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12369) Implement ListTransactions API

2021-03-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12369.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Implement ListTransactions API
> --
>
> Key: KAFKA-12369
> URL: https://issues.apache.org/jira/browse/KAFKA-12369
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> This tracks the implementation of the `ListTransactions` API documented by 
> KIP-664: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.
>  This API is similar to `ListGroups` for consumer groups.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-02 Thread GitBox


hachikuji merged pull request #10206:
URL: https://github.com/apache/kafka/pull/10206


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

2021-03-02 Thread GitBox


ijuma commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r585901281



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
 proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+ topics: Set[String])
+ : Map[String, TopicDescription] = {
+adminClient.describeTopics(topics.asJava).values.asScala.map {
+  case (topicName, topicDescriptionFuture) =>
+try {
+  topicName -> topicDescriptionFuture.get
+}
+catch {
+  case t: ExecutionException =>
+if 
(classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
   Also, you can have the `if` in the `case t` line and then a second 
`case` for the rethrow case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

2021-03-02 Thread GitBox


ijuma commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r585900837



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
 proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+ topics: Set[String])
+ : Map[String, TopicDescription] = {
+adminClient.describeTopics(topics.asJava).values.asScala.map {
+  case (topicName, topicDescriptionFuture) =>
+try {
+  topicName -> topicDescriptionFuture.get
+}
+catch {
+  case t: ExecutionException =>
+if 
(classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
   Hmm. why not write this as 
`t.getCause.isInstanceOf[UnknownTopicOrPartitionException]`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10193: MINOR: correct the error message of validating uint32

2021-03-02 Thread GitBox


ijuma commented on a change in pull request #10193:
URL: https://github.com/apache/kafka/pull/10193#discussion_r585899128



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
##
@@ -320,7 +320,7 @@ public Long validate(Object item) {
 if (item instanceof Long)
 return (Long) item;
 else
-throw new SchemaException(item + " is not a Long.");
+throw new SchemaException(item + " is not an a Long (encoding 
an unsigned integer).");

Review comment:
   Also, I think it reads a bit weird. Not clear that "encoding an unsigned 
integer" in brackets means when reading the message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10193: MINOR: correct the error message of validating uint32

2021-03-02 Thread GitBox


ijuma commented on a change in pull request #10193:
URL: https://github.com/apache/kafka/pull/10193#discussion_r585897929



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
##
@@ -320,7 +320,7 @@ public Long validate(Object item) {
 if (item instanceof Long)
 return (Long) item;
 else
-throw new SchemaException(item + " is not a Long.");
+throw new SchemaException(item + " is not an a Long (encoding 
an unsigned integer).");

Review comment:
   There is a typo here, "an a".





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


jolshan commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585895209



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+
request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+request.topics().iterator().asScala.foreach {
+  topic => if (topic.name() == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId())
+}
+  } else {
+if (topic.topicId().equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name())
+} else {
+  appendResponse(topic.name(), topic.topicId(), new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+
+val idToName = new util.HashMap[Uuid, String]
+
+def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+  if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+  appendResponse(name, id, new ApiError(INVALID_REQUEST,
+  "The same topic was specified by name and by id."))
+  idToName.remove(id)
+  duplicatedTopicIds.add(id)
+  }
+}
+controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+  case (name, idOrError) => if (idOrError.isError) {
+appendResponse(name, ZERO_UUID, idOrError.error())
+  } else {
+maybeAppendToIdToName(idOrError.result(), name)
+  }
+}
+controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+  case (id, nameOrError) => if (nameOrError.isError) {
+appendResponse(null, id, nameOrError.error())
+  } else {
+maybeAppendToIdToName(id, nameOrError.result())
+  }
+}
+
+if (!hasClusterAuth) {
+  val authorizedDescribeTopics = 
getDescribableTopics(idToName.values().asScala)
+  val authorizedDeleteTopics = 
getDeletableTopics(idToName.values().asScala)
+  val iterator = idToName.entrySet().iterator()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+val topicName = 

[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


jolshan commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585895209



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+
request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+request.topics().iterator().asScala.foreach {
+  topic => if (topic.name() == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId())
+}
+  } else {
+if (topic.topicId().equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name())
+} else {
+  appendResponse(topic.name(), topic.topicId(), new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+
+val idToName = new util.HashMap[Uuid, String]
+
+def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+  if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+  appendResponse(name, id, new ApiError(INVALID_REQUEST,
+  "The same topic was specified by name and by id."))
+  idToName.remove(id)
+  duplicatedTopicIds.add(id)
+  }
+}
+controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+  case (name, idOrError) => if (idOrError.isError) {
+appendResponse(name, ZERO_UUID, idOrError.error())
+  } else {
+maybeAppendToIdToName(idOrError.result(), name)
+  }
+}
+controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+  case (id, nameOrError) => if (nameOrError.isError) {
+appendResponse(null, id, nameOrError.error())
+  } else {
+maybeAppendToIdToName(id, nameOrError.result())
+  }
+}
+
+if (!hasClusterAuth) {
+  val authorizedDescribeTopics = 
getDescribableTopics(idToName.values().asScala)
+  val authorizedDeleteTopics = 
getDeletableTopics(idToName.values().asScala)
+  val iterator = idToName.entrySet().iterator()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+val topicName = 

[GitHub] [kafka] ijuma commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0

2021-03-02 Thread GitBox


ijuma commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r585894119



##
File path: build.gradle
##
@@ -1586,15 +1605,17 @@ project(':streams:test-utils') {
   archivesBaseName = "kafka-streams-test-utils"
 
   dependencies {
-compile project(':streams')
-compile project(':clients')
+implementation project(':streams')

Review comment:
   Yes, agreed. This is even more important since this module is public API 
and it exposes classes from streams and clients in said API (e.g. 
`Deserializer` and `TopologyException`).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-02 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585891911



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+
request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+request.topics().iterator().asScala.foreach {
+  topic => if (topic.name() == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId())
+}
+  } else {
+if (topic.topicId().equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name())
+} else {
+  appendResponse(topic.name(), topic.topicId(), new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+
+val idToName = new util.HashMap[Uuid, String]
+
+def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+  if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+  appendResponse(name, id, new ApiError(INVALID_REQUEST,
+  "The same topic was specified by name and by id."))
+  idToName.remove(id)
+  duplicatedTopicIds.add(id)
+  }
+}
+controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+  case (name, idOrError) => if (idOrError.isError) {
+appendResponse(name, ZERO_UUID, idOrError.error())
+  } else {
+maybeAppendToIdToName(idOrError.result(), name)
+  }
+}
+controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+  case (id, nameOrError) => if (nameOrError.isError) {
+appendResponse(null, id, nameOrError.error())
+  } else {
+maybeAppendToIdToName(id, nameOrError.result())
+  }
+}
+
+if (!hasClusterAuth) {
+  val authorizedDescribeTopics = 
getDescribableTopics(idToName.values().asScala)
+  val authorizedDeleteTopics = 
getDeletableTopics(idToName.values().asScala)
+  val iterator = idToName.entrySet().iterator()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+val topicName = 

[GitHub] [kafka] ijuma commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0

2021-03-02 Thread GitBox


ijuma commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r585891145



##
File path: release.py
##
@@ -631,7 +631,7 @@ def select_gpg_key():
 contents = f.read()
 if not user_ok("Going to build and upload mvn artifacts based on these 
settings:\n" + contents + '\nOK (y/n)?: '):
 fail("Retry again later")
-cmd("Building and uploading archives", "./gradlewAll uploadArchives", 
cwd=kafka_dir, env=jdk8_env, shell=True)
+cmd("Building and uploading archives", "./gradlewAll publish", cwd=kafka_dir, 
env=jdk8_env, shell=True)

Review comment:
   Good catch. I had searched for this, but somehow missed it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10249: MINOR: disable round_trip_fault_test system tests for Raft quorums

2021-03-02 Thread GitBox


rondagostino opened a new pull request #10249:
URL: https://github.com/apache/kafka/pull/10249


   The KIP-500 early access release will not support creating a partition with 
a manual partition assignment that includes a broker that is not currently 
online.  This patch disables system tests for Raft-based metadata quorums where 
the test depends on this functionality to pass.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12403) Broker handling of delete topic events

2021-03-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12403:

Labels: kip-500  (was: )

> Broker handling of delete topic events
> --
>
> Key: KAFKA-12403
> URL: https://issues.apache.org/jira/browse/KAFKA-12403
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> This issue tracks completion of metadata listener support for the topic 
> deletion event. When a topic is deleted, the broker needs to stop replicas, 
> delete log data, and remove cached topic configurations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10163: KAFKA-10357: Extract setup of changelog from Streams partition assignor

2021-03-02 Thread GitBox


ableegoldman merged pull request #10163:
URL: https://github.com/apache/kafka/pull/10163


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10163: KAFKA-10357: Extract setup of changelog from Streams partition assignor

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10163:
URL: https://github.com/apache/kafka/pull/10163#issuecomment-789174791


   Java11 failed with unrelated flaky 
`kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch`, Java15 
failed with `Execution failed for task ':streams:test-utils:unitTest'` (this PR 
does not touch on the test-utils so this failure should be unrelated, however I 
ran them locally to verify)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12403) Broker handling of delete topic events

2021-03-02 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12403:
---

 Summary: Broker handling of delete topic events
 Key: KAFKA-12403
 URL: https://issues.apache.org/jira/browse/KAFKA-12403
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


This issue tracks completion of metadata listener support for the topic 
deletion event. When a topic is deleted, the broker needs to stop replicas, 
delete log data, and remove cached topic configurations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-03-02 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12254:
-
Fix Version/s: 2.8.0

> MirrorMaker 2.0 creates destination topic with default configs
> --
>
> Key: KAFKA-12254
> URL: https://issues.apache.org/jira/browse/KAFKA-12254
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
> Fix For: 3.0.0, 2.8.0
>
>
> `MirrorSourceConnector` implements the logic for replicating data, 
> configurations, and other metadata between the source and destination 
> clusters. This includes the tasks below:
>  # `refreshTopicPartitions` for syncing topics / partitions from source to 
> destination.
>  # `syncTopicConfigs` for syncing topic configurations from source to 
> destination.
> A limitation is that `computeAndCreateTopicPartitions` creates topics with 
> default configurations on the destination cluster. A separate async task 
> `syncTopicConfigs` is responsible for syncing the topic configs. Before that 
> sync happens, topic configurations could be out of sync between the two 
> clusters.
> In the worst case, this could lead to data loss eg. when we have a compacted 
> topic being mirrored between clusters which is incorrectly created with the 
> default configuration of `cleanup.policy = delete` on the destination before 
> the configurations are sync'd via `syncTopicConfigs`.
> Here is an example of the divergence:
> Source Topic:
> ```
> Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,segment.bytes=1073741824
> ```
> Destination Topic:
> ```
> Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> segment.bytes=1073741824
> ```
> A safer approach is to ensure that the right configurations are set on the 
> destination cluster before data is replicated to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10163: KAFKA-10357: Extract setup of changelog from Streams partition assignor

2021-03-02 Thread GitBox


ableegoldman commented on a change in pull request #10163:
URL: https://github.com/apache/kafka/pull/10163#discussion_r585863461



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
+
+public class ChangelogTopics {
+
+private final InternalTopicManager internalTopicManager;
+private final Map topicGroups;
+private final Map> tasksForTopicGroup;
+private final Map> changelogPartitionsForTask 
= new HashMap<>();
+private final Map> 
preExistingChangelogPartitionsForTask = new HashMap<>();
+private final Set 
preExistingNonSourceTopicBasedChangelogPartitions = new HashSet<>();
+private final Set sourceTopicBasedChangelogTopics = new 
HashSet<>();
+private final Set sourceTopicBasedChangelogTopicPartitions 
= new HashSet<>();
+private final Logger log;
+
+public ChangelogTopics(final InternalTopicManager internalTopicManager,
+   final Map topicGroups,
+   final Map> tasksForTopicGroup,
+   final String logPrefix) {
+this.internalTopicManager = internalTopicManager;
+this.topicGroups = topicGroups;
+this.tasksForTopicGroup = tasksForTopicGroup;
+final LogContext logContext = new LogContext(logPrefix);
+log = logContext.logger(getClass());
+}
+
+public void setup() {

Review comment:
   Sounds good  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-03-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293953#comment-17293953
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10251:


Merged [https://github.com/apache/kafka/pull/10228,] please reopen this ticket 
if failures are seen on builds kicked off after 11:47am PST (March 2nd)

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10228: KAFKA-10251: increase timeout for consumeing records

2021-03-02 Thread GitBox


ableegoldman merged pull request #10228:
URL: https://github.com/apache/kafka/pull/10228


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10228: KAFKA-10251: increase timeout for consumeing records

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10228:
URL: https://github.com/apache/kafka/pull/10228#issuecomment-789166076


   Since this test is failing pretty regularly, I think we should go ahead and 
merge this now. A 30s timeout is pretty standard and if there is a real issue, 
just bumping the timeout from 15s to 30s should not cover it up. We can reopen 
the ticket if new failures are seen



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10247: MINOR: Fix log format in AbstractCoordinator

2021-03-02 Thread GitBox


ableegoldman commented on a change in pull request #10247:
URL: https://github.com/apache/kafka/pull/10247#discussion_r585856635



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -860,7 +860,7 @@ public void onSuccess(ClientResponse resp, 
RequestFuture future) {
 
 @Override
 public void onFailure(RuntimeException e, RequestFuture future) {
-log.debug("FindCoordinator request failed due to {}", e);
+log.debug("FindCoordinator request failed due to {}", 
e.getMessage());

Review comment:
   @guozhangwang has a PR open in which he fixes this on the side. See 
https://github.com/apache/kafka/pull/10232/files#r584478802





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10228: KAFKA-10251: increase timeout for consumeing records

2021-03-02 Thread GitBox


ableegoldman commented on pull request #10228:
URL: https://github.com/apache/kafka/pull/10228#issuecomment-789162142


   Three unrelated failures:
   ```
   Build / JDK 15 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining
   Build / JDK 15 / 
kafka.api.AuthorizerIntegrationTest.testFetchFollowerRequest()
   Build / JDK 15 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sknop commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-03-02 Thread GitBox


sknop commented on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-789161208


   Yes of course, totally agree.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10237: MINOR: fix failing system test delegation_token_test

2021-03-02 Thread GitBox


rondagostino commented on a change in pull request #10237:
URL: https://github.com/apache/kafka/pull/10237#discussion_r585829658



##
File path: tests/kafkatest/services/security/security_config.py
##
@@ -374,7 +374,8 @@ def interbroker_sasl_mechanism(self):
 def enabled_sasl_mechanisms(self):
 sasl_mechanisms = []
 if self.is_sasl(self.security_protocol):
-sasl_mechanisms += [self.client_sasl_mechanism]
+# .csv is supported so be sure to account for that possibility
+sasl_mechanisms += self.client_sasl_mechanism.strip().split(',')

Review comment:
   > Not sure if we should bother in this PR, but the usages of 
client_sasl_mechanism could stand to be cleaned up
   
   I agree it needs to be cleaned up.  Given we are past code freeze for 2.8, 
I've opened https://issues.apache.org/jira/browse/KAFKA-12402 for this and we 
can address it another time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12402) client_sasl_mechanism should be an explicit list instead of a .csv string

2021-03-02 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-12402:
-

 Summary: client_sasl_mechanism should be an explicit list instead 
of a .csv string
 Key: KAFKA-12402
 URL: https://issues.apache.org/jira/browse/KAFKA-12402
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Ron Dagostino


The SecurityConfig and the KafkaService system test classes both accept a 
client_sasl_mechanism parameter.  This is typically a single value (e.g. 
PLAIN), but DelegationTokenTest sets self.kafka.client_sasl_mechanism = 
'GSSAPI,SCRAM-SHA-256'.  If we need to support a list of mechanisms then the 
parameter should be an explicit list instead of a .csv string.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()

2021-03-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293926#comment-17293926
 ] 

Matthias J. Sax commented on KAFKA-12319:
-

Failed again.

> Flaky test 
> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
> -
>
> Key: KAFKA-12319
> URL: https://issues.apache.org/jira/browse/KAFKA-12319
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I've seen this test fail a few times locally. But recently I saw it fail on a 
> PR build on Jenkins.
>  
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/]
> h3. Error Message
> java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
> Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 
> sec) ==> expected: <30.0> but was: <37.436825357209706>
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12394) Consider topic id existence and authorization errors

2021-03-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12394.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

> Consider topic id existence and authorization errors
> 
>
> Key: KAFKA-12394
> URL: https://issues.apache.org/jira/browse/KAFKA-12394
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
> Fix For: 2.8.0
>
>
> We have historically had logic in the api layer to avoid leaking the 
> existence or non-existence of topics to clients which are not authorized to 
> describe them. The way we have done this is to always authorize the topic 
> name first before checking existence.
> Topic ids make this more difficult because the resource (ie the topic name) 
> has to be derived. This means we have to check existence of the topic first. 
> If the topic does not exist, then our hands are tied and we have to return 
> UNKNOWN_TOPIC_ID. If the topic does exist, then we need to check if the 
> client is authorized to describe it. The question comes then what we should 
> do if the client is not authorized?
> The current behavior is to return UNKNOWN_TOPIC_ID. The downside is that this 
> is misleading and forces the client to retry even though they are doomed to 
> hit the same error. However, the client should generally handle this by 
> requesting Metadata using the topic name that they are interested in, which 
> would give them a chance to see the topic authorization error. Basically the 
> fact that you need describe permission in the first place to discover the 
> topic id makes this an unlikely scenario.
> There is an argument to be made for TOPIC_AUTHORIZATION_FAILED as well. 
> Basically we could take the stance that we do not care about leaking the 
> existence of topic IDs since they do not reveal anything about the underlying 
> topic. Additionally, there is little likelihood of a user discovering a valid 
> UUID by accident or even through brute force. The benefit of this is that 
> users get a clear error for cases where a topic Id may have been discovered 
> through some external means. For example, an administrator finds a topic ID 
> in the logging and attempts to delete it using the new `deleteTopicsWithIds` 
> Admin API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest

2021-03-02 Thread Rohit Deshpande (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293907#comment-17293907
 ] 

Rohit Deshpande commented on KAFKA-10788:
-

thanks [~cadonna] makes sense to me. Will post the update here.

> Streamlining Tests in CachingInMemoryKeyValueStoreTest
> --
>
> Key: KAFKA-10788
> URL: https://issues.apache.org/jira/browse/KAFKA-10788
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Rohit Deshpande
>Priority: Major
>  Labels: newbie
>
> While reviewing, kIP-614, it was decided that tests for 
> [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588]
>  need to be streamlined to use mocked underlyingStore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

2021-03-02 Thread GitBox


hachikuji merged pull request #10223:
URL: https://github.com/apache/kafka/pull/10223


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vanhoale commented on pull request #10239: KAFKA-12372: Enhance TimestampCoverter Connect transformation to handle multiple timestamp or date fields

2021-03-02 Thread GitBox


vanhoale commented on pull request #10239:
URL: https://github.com/apache/kafka/pull/10239#issuecomment-789110876


   I ran my test, checkstyle and spotbugs successful at my local, but not sure 
some checks failed here



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-02 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585800691



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1813,17 +1813,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
 producer.beginTransaction()
 producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
 
+def assertListTransactionResult(
+  expectedTransactionalIds: Set[String]
+): Unit = {
+  val listTransactionsRequest = new ListTransactionsRequest.Builder(new 
ListTransactionsRequestData()).build()
+  val listTransactionsResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
+  assertEquals(Errors.NONE, 
Errors.forCode(listTransactionsResponse.data.errorCode))
+  assertEquals(expectedTransactionalIds, 
listTransactionsResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+}
+
 // First verify that we can list the transaction
-val listTransactionsRequest = new ListTransactionsRequest.Builder(new 
ListTransactionsRequestData()).build()
-val authorizedResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
-assertEquals(Errors.NONE, 
Errors.forCode(authorizedResponse.data.errorCode))
-assertEquals(Set(transactionalId), 
authorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+assertListTransactionResult(expectedTransactionalIds = 
Set(transactionalId))
 
 // Now revoke authorization and verify that the transaction is no longer 
listable
 removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)
-val unauthorizedResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
-assertEquals(Errors.NONE, 
Errors.forCode(unauthorizedResponse.data.errorCode))
-assertEquals(Set(), 
unauthorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+assertListTransactionResult(expectedTransactionalIds = Set())
+
+// The minimum permission needed is `Describe`
+addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)

Review comment:
   Ugh, yes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-03-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293885#comment-17293885
 ] 

Matthias J. Sax edited comment on KAFKA-10251 at 3/2/21, 6:11 PM:
--

Two more.


was (Author: mjsax):
One more.

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12401) Flaky Test FeatureCommandTest#testUpgradeAllFeaturesSuccess

2021-03-02 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12401:
---

 Summary: Flaky Test 
FeatureCommandTest#testUpgradeAllFeaturesSuccess
 Key: KAFKA-12401
 URL: https://issues.apache.org/jira/browse/KAFKA-12401
 Project: Kafka
  Issue Type: Test
  Components: admin, unit tests
Reporter: Matthias J. Sax


{quote}kafka.admin.UpdateFeaturesException: 2 feature updates failed! at 
kafka.admin.FeatureApis.maybeApplyFeatureUpdates(FeatureCommand.scala:289) at 
kafka.admin.FeatureApis.upgradeAllFeatures(FeatureCommand.scala:191) at 
kafka.admin.FeatureCommandTest.$anonfun$testUpgradeAllFeaturesSuccess$3(FeatureCommandTest.scala:134){quote}
STDOUT
{quote}[Add] Feature: feature_1 ExistingFinalizedMaxVersion: - 
NewFinalizedMaxVersion: 3 Result: OK [Add] Feature: feature_2 
ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 5 Result: OK [Add] 
Feature: feature_1 ExistingFinalizedMaxVersion: - NewFinalizedMaxVersion: 3 
Result: OK [Add] Feature: feature_2 ExistingFinalizedMaxVersion: - 
NewFinalizedMaxVersion: 5 Result: OK{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8003) Flaky Test TransactionsTest #testFencingOnTransactionExpiration

2021-03-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293889#comment-17293889
 ] 

Matthias J. Sax commented on KAFKA-8003:


A third test method:
{quote}org.opentest4j.AssertionFailedError: Consumed 0 records before timeout 
instead of the expected 1000 records at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at 
org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at 
kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:852) at 
kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1476) at 
kafka.api.TransactionsTest.testMultipleMarkersOneLeader(TransactionsTest.scala:588){quote}
STDOUT
{quote}[2021-03-02 10:31:53,233] ERROR [ZooKeeperClient Kafka server] Auth 
failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-03-02 10:31:53,315] WARN 
SASL configuration failed: javax.security.auth.login.LoginException: No JAAS 
configuration section named 'Client' was found in specified JAAS configuration 
file: '/tmp/kafka5762171763596437663.tmp'. Will continue connection to 
Zookeeper server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094){quote}
Seems to be the same issue as above? 

> Flaky Test TransactionsTest #testFencingOnTransactionExpiration
> ---
>
> Key: KAFKA-8003
> URL: https://issues.apache.org/jira/browse/KAFKA-8003
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.2.3
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/34/]
> {quote}java.lang.AssertionError: expected:<1> but was:<0> at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:645) at 
> org.junit.Assert.assertEquals(Assert.java:631) at 
> kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:510){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-03-02 Thread GitBox


rhauch commented on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-789100759


   If we don't clarify the documentation, then I think users will be very 
confused. Can you take a stab at improving/expanding the documentation a bit to 
clarify the input and output types?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-03-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293885#comment-17293885
 ] 

Matthias J. Sax commented on KAFKA-10251:
-

One more.

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-03-02 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585784412



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1813,17 +1813,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
 producer.beginTransaction()
 producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
 
+def assertListTransactionResult(
+  expectedTransactionalIds: Set[String]
+): Unit = {
+  val listTransactionsRequest = new ListTransactionsRequest.Builder(new 
ListTransactionsRequestData()).build()
+  val listTransactionsResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
+  assertEquals(Errors.NONE, 
Errors.forCode(listTransactionsResponse.data.errorCode))
+  assertEquals(expectedTransactionalIds, 
listTransactionsResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+}
+
 // First verify that we can list the transaction
-val listTransactionsRequest = new ListTransactionsRequest.Builder(new 
ListTransactionsRequestData()).build()
-val authorizedResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
-assertEquals(Errors.NONE, 
Errors.forCode(authorizedResponse.data.errorCode))
-assertEquals(Set(transactionalId), 
authorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+assertListTransactionResult(expectedTransactionalIds = 
Set(transactionalId))
 
 // Now revoke authorization and verify that the transaction is no longer 
listable
 removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)
-val unauthorizedResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
-assertEquals(Errors.NONE, 
Errors.forCode(unauthorizedResponse.data.errorCode))
-assertEquals(Set(), 
unauthorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+assertListTransactionResult(expectedTransactionalIds = Set())
+
+// The minimum permission needed is `Describe`
+addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)

Review comment:
   Should permission be describe rather than write?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #10224: MINOR: Disable transactional/idempotent system tests for Raft quorums

2021-03-02 Thread GitBox


mumrah merged pull request #10224:
URL: https://github.com/apache/kafka/pull/10224


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10237: MINOR: fix failing system test delegation_token_test

2021-03-02 Thread GitBox


mumrah commented on a change in pull request #10237:
URL: https://github.com/apache/kafka/pull/10237#discussion_r585784850



##
File path: tests/kafkatest/services/security/security_config.py
##
@@ -374,7 +374,8 @@ def interbroker_sasl_mechanism(self):
 def enabled_sasl_mechanisms(self):
 sasl_mechanisms = []
 if self.is_sasl(self.security_protocol):
-sasl_mechanisms += [self.client_sasl_mechanism]
+# .csv is supported so be sure to account for that possibility
+sasl_mechanisms += self.client_sasl_mechanism.strip().split(',')

Review comment:
   Not sure if we should bother in this PR, but the usages of 
`client_sasl_mechanism` could stand to be cleaned up. In 
`SecurityConfig.__init__` we default it to a simple string, but as you found 
here (and as seen in `SecurityConfig.client_config`) we support it being a 
comma delimited string. 
   
   
https://github.com/apache/kafka/blob/58b3b1b557e9ba19cffde91bd117a89b947f1fc1/tests/kafkatest/services/security/security_config.py#L240-L253
   
   It's probably a lot safer to declare this as a list in the class and not 
worry about having to do the `split(",")` everywhere. Though maybe there's a 
reason why we split it lazily.. not sure.
   
   Either way, this change looks good. However, you might consider doing 
something like:
   
   ```python
   sasl_mechanisms += [mechanism.strip() for mechanism in 
self.client_sasl_mechanism.split(',')]
   ```
   
   since `self.client_sasl_mechanism.strip()` won't catch spaces in the middle 
of the string.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9717: KAFKA-10766: Unit test cases for RocksDBRangeIterator

2021-03-02 Thread GitBox


guozhangwang commented on pull request #9717:
URL: https://github.com/apache/kafka/pull/9717#issuecomment-789095693


   LGTM. Merged to trunk and cherry-picked to 2.8 cc @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9717: KAFKA-10766: Unit test cases for RocksDBRangeIterator

2021-03-02 Thread GitBox


guozhangwang merged pull request #9717:
URL: https://github.com/apache/kafka/pull/9717


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-03-02 Thread GitBox


guozhangwang commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-789094844


   Cherry-picked to 2.8 as well cc @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-03-02 Thread GitBox


guozhangwang commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-789094199


   LGTM. Thanks for the contribution @vamossagar12 !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-03-02 Thread GitBox


guozhangwang merged pull request #10052:
URL: https://github.com/apache/kafka/pull/10052


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #10248: MINOR: main function of o.a.k.c.p.t.Type does not show all types

2021-03-02 Thread GitBox


chia7712 opened a new pull request #10248:
URL: https://github.com/apache/kafka/pull/10248


   This PR includes following changes
   
   1. rename `UNSIGNED_INT32` to `UINT32` (consistent to `UINT16`)
   1. make sure Type.toHtml shows `UINT16`, `UINT32` and `COMPACT_RECORDS`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10234: MINOR; Clean up LeaderAndIsrResponse construction in `ReplicaManager#becomeLeaderOrFollower`

2021-03-02 Thread GitBox


chia7712 commented on a change in pull request #10234:
URL: https://github.com/apache/kafka/pull/10234#discussion_r585723099



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##
@@ -145,24 +145,22 @@ public LeaderAndIsrResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 .setErrorCode(error.code()));
 }
 responseData.setPartitionErrors(partitions);
-return new LeaderAndIsrResponse(responseData, version());
-}
-
-List topics = new 
ArrayList<>(data.topicStates().size());
-Map topicIds = topicIds();
-for (LeaderAndIsrTopicState topicState : data.topicStates()) {
-LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
-topicError.setTopicId(topicIds.get(topicState.topicName()));
-List partitions = new 
ArrayList<>(topicState.partitionStates().size());
-for (LeaderAndIsrPartitionState partition : 
topicState.partitionStates()) {
-partitions.add(new LeaderAndIsrPartitionError()
+} else {
+Map topicIds = topicIds();
+for (LeaderAndIsrTopicState topicState : data.topicStates()) {
+List partitions = new ArrayList<>(
+topicState.partitionStates().size());
+for (LeaderAndIsrPartitionState partition : 
topicState.partitionStates()) {
+partitions.add(new LeaderAndIsrPartitionError()
 .setPartitionIndex(partition.partitionIndex())
 .setErrorCode(error.code()));
+}
+responseData.topics().add(new LeaderAndIsrTopicError()
+.setTopicId(topicIds.get(topicState.topicName()))

Review comment:
   Could it be replaced by `setTopicId(topicState.topicId())`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #9128: KAFKA-7540 reduce session timeout to evict dead member in time and so…

2021-03-02 Thread GitBox


chia7712 closed pull request #9128:
URL: https://github.com/apache/kafka/pull/9128


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #10086: MINOR: expose number of forwarding requests to metrics

2021-03-02 Thread GitBox


chia7712 closed pull request #10086:
URL: https://github.com/apache/kafka/pull/10086


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10240: KAFKA-12381: only return leader not available for internal topic creation

2021-03-02 Thread GitBox


hachikuji commented on pull request #10240:
URL: https://github.com/apache/kafka/pull/10240#issuecomment-789032940


   Ok, I think I see what is going on now. The failing system test is verifying 
what happens when inter-broker communication no longer works. This results in 
different behavior because `AutoTopicCreationManager` relies on the 
`MetadataCache` in order to determine the number of live brokers while the old 
logic checked zk directly. That makes the `INVALID_REPLICATION_FACTOR` more 
dangerous since it is not retriable and the cache may be stale. In particular, 
when inter-broker communication is down, the cache will be empty and the broker 
will end up trying to auto-create all topics.
   
   I can think of a few options to address the problem:
   
   1. Bring back the old logic to check Zookeeper for the live brokers. This 
might be fine for 2.8, but it does not address the problem for KIP-500.
   2. Return a retriable error instead. Really `UNKNOWN_TOPIC_OR_PARTITION` 
would be a better error in this case.
   3. Make `INVALID_REPLICATION_FACTOR` a retriable error. I guess we have to 
understand how clients 
   
   My inclination is probably option 2. The downside is that the user would no 
longer get a clear error when a topic cannot be auto-created. But I feel 
overall it's the safest and most consistent way to handle this case. There 
might be other options though.
   
   It's interesting to note that this relates back to some of the discussion in 
the auto-create PR itself. We had discussed skipping the replication factor 
check on the broker and sending the request to the controller. But either way, 
we have to rely on the metadata cache locally at least to determine whether the 
topic already exists or not, so it might not have really helped.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-03-02 Thread GitBox


wcarlson5 commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r585710809



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) 
{
 closeToError();
 }
 final StreamThread deadThread = (StreamThread) Thread.currentThread();
-threads.remove(deadThread);

Review comment:
   since the name doesn't matter I wonder why we spent so much effort 
making sure it had the same name?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
 timeout = true;
+// Don't remove from threads until shutdown is 
complete. We will trim it from the
+// list once it reaches DEAD, and if for some 
reason it's hanging indefinitely in the
+// shutdown then we should just consider this 
thread.id to be burned
+} else {
+threads.remove(streamThread);

Review comment:
   I think we can leave it for now, if we should see problems this could be 
a fix, we don't run a single thread soak so we won't see this issue ourselves 
but there are many single thread applications that could start using this and 
we should see if they have problems





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >