[GitHub] [kafka] ableegoldman commented on pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-27 Thread GitBox


ableegoldman commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-887994503


   It seems super awkward and likely to lead to confusion/future mistakes to 
use something called a `"SessionWindow"` in the "SlidingWindowAggregate", 
although yeah, it's pretty much exactly the same otherwise and can be reused.
   
   In other words, I totally agree with your proposal to just rename the 
existing Window implementations to describe the actual interval they represent, 
rather than some specific type of windowed operation that just happens to use 
them at the moment. (In fact I had written that first paragraph before I even 
saw your comment with the renaming proposal, great minds think alike huh 😜)
   
   That said, those names are just super clunky. Imagine trying to code 
something up with that...just takes too much mental processing. Maybe it's my 
inner physicist, but sometimes mathematical precision just isn't appropriate 
for real-world usage (don't tell any mathematicians I said that!) Unfortunately 
I'm not crazy about any of the alternatives I can think up, maybe you can come 
up with some better ideas. Here's the best I could come up with:
   
   `TimeWindow` --> `InclusiveExclusiveWindow` 
   `SessionWindow` / `SlidingWindow` --> `InclusiveInclusiveWindow`
   `UnlimitedWindow` --> `InclusiveUnboundedWindow`
   
   To me at least these feel more natural, ie it's clear what they mean without 
having to reference Wikipedia. I mean most people probably do know what 
open/closed mean, but inclusive/exclusive is more to the point. Also I think we 
can drop left/right and just imply it by the ordering. Thoughts? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-27 Thread GitBox


showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-887989639


   @mjsax @ableegoldman , I found we already had a `SessionWindow` to represent 
the close time interval: `[start,end]`. So I directly use it for 
`SlidingWindows` aggregation window creation. I think this is what we want, 
right?
   
   I have another thought, which is to rename the time interval related 
windows. Currently, we have 3 types of time interval window:
   `TimeWindow` -> to have `[start,end)` time interval
   `SessionWindow` -> to have `[start,end]` time interval
   `UnlimitedWindow` -> to have `[start, MAX_VALUE)` time interval
   
   I think the name `SessionWindow` is definitely not good here, especially we 
want to use it in `SlidingWindows` now, although it is only used for 
`SessionWindows` before. We should name them with time interval meaning, not 
the streaming window functions meaning. ex:
   `TimeWindow` ->  `LeftClosedRightOpenWindow`
   `SessionWindow` -> `ClosedTimeWindow`
   `UnlimitedWindow` -> `LeftClosedWindow`
   ref: the `Classification of intervals` section in 
https://en.wikipeadia.org/wiki/Interval_(mathematics)
   
   Because these 3 window types are internal use only, it is safe to rename 
them. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-07-27 Thread GitBox


showuon commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-887980048


   @dajac , could you help take a look at this PR? Also, do we want to put this 
PR into v3.0? Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r677941027



##
File path: 
core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
##
@@ -0,0 +1,102 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.api.{ApiVersion, KAFKA_2_8_IV0, KAFKA_3_0_IV1}
+import kafka.network.SocketServer
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest {
+
+  override def brokerCount: Int = 3
+  override def generateConfigs: Seq[KafkaConfig] = {
+Seq(
+  createConfig(0, KAFKA_2_8_IV0),
+  createConfig(1, KAFKA_3_0_IV1),

Review comment:
   I think this was discussed briefly before, but is there a reason 
KAFKA_3_0_IV1 was chosen? Should we just use the most recent IBP? (meaning, not 
even specify in the properties -- just pick up the default?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r677933885



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
##
@@ -32,28 +34,87 @@
  */
 @InterfaceStability.Evolving
 public class DescribeTopicsResult {
-private final Map> futures;
+private final Map> topicIdFutures;
+private final Map> nameFutures;
 
-protected DescribeTopicsResult(Map> 
futures) {
-this.futures = futures;
+protected DescribeTopicsResult(Map> 
topicIdFutures, Map> nameFutures) {
+if (topicIdFutures != null && nameFutures != null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
+if (topicIdFutures == null && nameFutures == null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+this.topicIdFutures = topicIdFutures;
+this.nameFutures = nameFutures;
+}
+
+static DescribeTopicsResult ofTopicIds(Map> topicIdFutures) {
+return new DescribeTopicsResult(topicIdFutures, null);
+}
+
+static DescribeTopicsResult ofTopicNames(Map> nameFutures) {
+return new DescribeTopicsResult(null, nameFutures);
+}
+
+/**
+ * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicIdCollection
+ *
+ * @return a map from topic IDs to futures which can be used to check the 
status of
+ * individual topics if the request used topic IDs, otherwise 
return null.
+ */
+public Map> topicIdValues() {
+return topicIdFutures;
+}
+
+/**
+ * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicNameCollection
+ *
+ * @return a map from topic names to futures which can be used to check 
the status of
+ * individual topics if the request used topic names, otherwise 
return null.
+ */
+public Map> topicNameValues() {
+return nameFutures;
 }
 
 /**
  * Return a map from topic names to futures which can be used to check the 
status of
  * individual topics.
  */
+@Deprecated
 public Map> values() {
-return futures;
+return nameFutures;
 }
 
 /**
  * Return a future which succeeds only if all the topic descriptions 
succeed.
  */
+@Deprecated
 public KafkaFuture> all() {
-return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+return all(nameFutures);
+}
+
+/**
+ * Return a future which succeeds only if all the topic descriptions 
succeed.
+ */
+public KafkaFuture> allTopicNames() {
+return all(nameFutures);
+}
+
+/**
+ * Return a future which succeeds only if all the topic descriptions 
succeed.
+ */
+public KafkaFuture> allTopicIds() {
+return all(topicIdFutures);
+}
+
+/**
+ * Return a future which succeeds only if all the topic descriptions 
succeed.
+ */
+private static  KafkaFuture> all(Map> futures) {
+

Review comment:
   nit: extra newline here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r677933885



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
##
@@ -32,28 +34,87 @@
  */
 @InterfaceStability.Evolving
 public class DescribeTopicsResult {
-private final Map> futures;
+private final Map> topicIdFutures;
+private final Map> nameFutures;
 
-protected DescribeTopicsResult(Map> 
futures) {
-this.futures = futures;
+protected DescribeTopicsResult(Map> 
topicIdFutures, Map> nameFutures) {
+if (topicIdFutures != null && nameFutures != null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
+if (topicIdFutures == null && nameFutures == null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+this.topicIdFutures = topicIdFutures;
+this.nameFutures = nameFutures;
+}
+
+static DescribeTopicsResult ofTopicIds(Map> topicIdFutures) {
+return new DescribeTopicsResult(topicIdFutures, null);
+}
+
+static DescribeTopicsResult ofTopicNames(Map> nameFutures) {
+return new DescribeTopicsResult(null, nameFutures);
+}
+
+/**
+ * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicIdCollection
+ *
+ * @return a map from topic IDs to futures which can be used to check the 
status of
+ * individual topics if the request used topic IDs, otherwise 
return null.
+ */
+public Map> topicIdValues() {
+return topicIdFutures;
+}
+
+/**
+ * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicNameCollection
+ *
+ * @return a map from topic names to futures which can be used to check 
the status of
+ * individual topics if the request used topic names, otherwise 
return null.
+ */
+public Map> topicNameValues() {
+return nameFutures;
 }
 
 /**
  * Return a map from topic names to futures which can be used to check the 
status of
  * individual topics.
  */
+@Deprecated
 public Map> values() {
-return futures;
+return nameFutures;
 }
 
 /**
  * Return a future which succeeds only if all the topic descriptions 
succeed.
  */
+@Deprecated
 public KafkaFuture> all() {
-return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+return all(nameFutures);
+}
+
+/**
+ * Return a future which succeeds only if all the topic descriptions 
succeed.
+ */
+public KafkaFuture> allTopicNames() {
+return all(nameFutures);
+}
+
+/**
+ * Return a future which succeeds only if all the topic descriptions 
succeed.
+ */
+public KafkaFuture> allTopicIds() {
+return all(topicIdFutures);
+}
+
+/**
+ * Return a future which succeeds only if all the topic descriptions 
succeed.
+ */
+private static  KafkaFuture> all(Map> futures) {
+

Review comment:
   nit: extra space here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r677932954



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -303,7 +303,33 @@ default DescribeTopicsResult 
describeTopics(Collection topicNames) {
  * @param optionsThe options to use when describing the topic.
  * @return The DescribeTopicsResult.
  */
-DescribeTopicsResult describeTopics(Collection topicNames, 
DescribeTopicsOptions options);
+default DescribeTopicsResult describeTopics(Collection topicNames, 
DescribeTopicsOptions options) {
+return describeTopics(TopicCollection.ofTopicNames(topicNames), 
options);
+}
+
+/**
+ * This is a convenience method for {@link 
#describeTopics(TopicCollection, DescribeTopicsOptions)}
+ * with default options. See the overload for more details.
+ * 
+ * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.
+ *
+ * @param topics The topics to describe.
+ * @return The DescribeTopicsResult.
+ */
+default DescribeTopicsResult describeTopics(TopicCollection topics) {
+return describeTopics(topics, new DescribeTopicsOptions());
+}
+
+/**
+ * describe a batch of topics.

Review comment:
   Can we adjust this javadoc to be `Describe some topics in the cluster.` 
like the previous API?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r677932361



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -42,6 +34,14 @@
 import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;

Review comment:
   nit: is there a reason we moved these imports?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-27 Thread GitBox


jolshan commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-887966371


   @dengziming -- @rajinisivaram can help in about 2 weeks. I will try to help 
review so it will be ready when she gets to it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-27 Thread GitBox


dengziming commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-887949936


   @rajinisivaram Are you interested in taking this across finish line? I think 
this has something that is very close to a first version, we can try to land 
that and then improvise since this has been worked for a long time, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-27 Thread GitBox


hachikuji merged pull request #11098:
URL: https://github.com/apache/kafka/pull/11098


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11116: KAFKA-13114: Revert state and reregister raft listener

2021-07-27 Thread GitBox


hachikuji commented on pull request #6:
URL: https://github.com/apache/kafka/pull/6#issuecomment-887935935


   I'm trying to think of some approach for validating this logic. It is 
difficult because it is handling unexpected exceptions. One thought I had is 
implementing a poison message of some kind which could expire after some TTL. 
When the controller sees the poison message, it would check if it is still 
active and raise an exception accordingly. Something like that could be used in 
an integration test, which might be simpler than trying to induce a failure by 
mucking with internal state.
   
   Another idea is to corrupt the log on one of the nodes, but I'm not sure 
this would hit the right path. In fact, this is probably a gap at the moment. 
If the batch reader fails during iteration, we should probably resign and 
perhaps even fail. I'll file a separate JIRA for this.
   
   In any case, I think we should try to come up with some way to exercise this 
path. Otherwise it's hard to say if it even works (though it looks reasonable 
enough).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller

2021-07-27 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13142:
---
Fix Version/s: 3.0.0

> KRaft brokers do not validate dynamic configs before forwarding them to 
> controller
> --
>
> Key: KAFKA-13142
> URL: https://issues.apache.org/jira/browse/KAFKA-13142
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The KRaft brokers are not currently validating dynamic configs before 
> forwarding them to the controller. To ensure that KRaft clusters are easily 
> upgradable it would be a good idea to validate dynamic configs in the first 
> release of KRaft so that invalid dynamic configs are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2021-07-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388378#comment-17388378
 ] 

Matthias J. Sax commented on KAFKA-9897:


Works for me.

> Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-9897
> URL: https://issues.apache.org/jira/browse/KAFKA-9897
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
> {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
> state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
> not RUNNING at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
>  at 
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
>  at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #11131: KAFKA-13137: KRaft Controller Metric MBean names incorrectly quoted

2021-07-27 Thread GitBox


hachikuji commented on pull request #11131:
URL: https://github.com/apache/kafka/pull/11131#issuecomment-887924795


   @rondagostino I've seen this test failing consistently: 
`kafka.controller.ControllerEventManagerTest.testMetricsCleanedOnClose()`. I 
tried it locally and it passes when run by itself. I have a suspicion that now 
that the jmx names have been fixed, there is kind of registration conflict. 
Perhaps the new tests or some other test which involves the KRaft controller is 
leaving behind the mbeans. As we noted above, the de-registration logic has not 
been implemented for these metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel commented on pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


niket-goel commented on pull request #11135:
URL: https://github.com/apache/kafka/pull/11135#issuecomment-887915977


   @jsancio 
   Without the change (as per Jason's description in the JIRA 
https://issues.apache.org/jira/browse/KAFKA-13143), the API would return an 
empty list of topics making the user think that they had no topics.
   
   With the change the describe and list topic APIs timeout on the controller 
endpoint (same as create_topic).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller

2021-07-27 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13142:
--
Priority: Blocker  (was: Major)

> KRaft brokers do not validate dynamic configs before forwarding them to 
> controller
> --
>
> Key: KAFKA-13142
> URL: https://issues.apache.org/jira/browse/KAFKA-13142
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
>
> The KRaft brokers are not currently validating dynamic configs before 
> forwarding them to the controller. To ensure that KRaft clusters are easily 
> upgradable it would be a good idea to validate dynamic configs in the first 
> release of KRaft so that invalid dynamic configs are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller

2021-07-27 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13142:
--
Priority: Major  (was: Blocker)

> KRaft brokers do not validate dynamic configs before forwarding them to 
> controller
> --
>
> Key: KAFKA-13142
> URL: https://issues.apache.org/jira/browse/KAFKA-13142
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Major
>
> The KRaft brokers are not currently validating dynamic configs before 
> forwarding them to the controller. To ensure that KRaft clusters are easily 
> upgradable it would be a good idea to validate dynamic configs in the first 
> release of KRaft so that invalid dynamic configs are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10548) Implement deletion logic for LeaderAndIsrRequests

2021-07-27 Thread Lucas Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388367#comment-17388367
 ] 

Lucas Wang commented on KAFKA-10548:


Hi [~jolshan] thanks for the KIP-516 and for keeping track of the tasks. At 
LinkedIn, we are very interested in adopting the better topic deletions.

Have you started/planned to work on this ticket? If not, do you mind if I take 
it over?

> Implement deletion logic for LeaderAndIsrRequests
> -
>
> Key: KAFKA-10548
> URL: https://issues.apache.org/jira/browse/KAFKA-10548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> This will allow for specialized deletion logic when receiving 
> LeaderAndIsrRequests
> Will also create and utilize delete.stale.topic.delay.ms configuration option



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller

2021-07-27 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13142:
--
Description: The KRaft brokers are not currently validating dynamic configs 
before forwarding them to the controller. To ensure that KRaft clusters are 
easily upgradable it would be a good idea to validate dynamic configs in the 
first release of KRaft so that invalid dynamic configs are never stored.  (was: 
The KRaft controller is not currently validating dynamic configs. To ensure 
that KRaft clusters are easily upgradable it would be a good idea to validate 
dynamic configs in the first release of KRaft so that invalid dynamic configs 
are never stored.)

> KRaft brokers do not validate dynamic configs before forwarding them to 
> controller
> --
>
> Key: KAFKA-13142
> URL: https://issues.apache.org/jira/browse/KAFKA-13142
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
>
> The KRaft brokers are not currently validating dynamic configs before 
> forwarding them to the controller. To ensure that KRaft clusters are easily 
> upgradable it would be a good idea to validate dynamic configs in the first 
> release of KRaft so that invalid dynamic configs are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller

2021-07-27 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13142:
--
Summary: KRaft brokers do not validate dynamic configs before forwarding 
them to controller  (was: KRaft controller does not validate dynamic configs)

> KRaft brokers do not validate dynamic configs before forwarding them to 
> controller
> --
>
> Key: KAFKA-13142
> URL: https://issues.apache.org/jira/browse/KAFKA-13142
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
>
> The KRaft controller is not currently validating dynamic configs. To ensure 
> that KRaft clusters are easily upgradable it would be a good idea to validate 
> dynamic configs in the first release of KRaft so that invalid dynamic configs 
> are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


niket-goel commented on a change in pull request #11135:
URL: https://github.com/apache/kafka/pull/11135#discussion_r677871792



##
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##
@@ -145,6 +145,14 @@ class ControllerApisTest {
 authorizer
   }
 
+  @Test
+  def testHanldleMetadata(): Unit = {
+val caught = assertThrows(classOf[ApiException], () => 
createControllerApis(
+  None, new MockController.Builder().build()).

Review comment:
   Was chasing a phantom. This was not an issue. Incorrect implementation 
on my part. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11134: KAFKA-12851: Fix Raft partition simulation

2021-07-27 Thread GitBox


jsancio commented on a change in pull request #11134:
URL: https://github.com/apache/kafka/pull/11134#discussion_r677868867



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -171,6 +171,12 @@ private boolean updateHighWatermark() {
 || (highWatermarkUpdateOffset == 
currentHighWatermarkMetadata.offset &&
 
!highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata)))
 {
 highWatermark = highWatermarkUpdateOpt;
+log.debug(

Review comment:
   Yes. Updated the PR to use trace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-27 Thread GitBox


guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r677787551



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -859,27 +855,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 ClientMetrics.addVersionMetric(streamsMetrics);
 ClientMetrics.addCommitIdMetric(streamsMetrics);
 ClientMetrics.addApplicationIdMetric(streamsMetrics, 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
internalTopologyBuilder.describe().toString());
+ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
this.topologyMetadata.topologyDescriptionString());
 ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state);
 ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) -> getNumLiveStreamThreads());
 
 streamsMetadataState = new StreamsMetadataState(
-internalTopologyBuilder,
+this.topologyMetadata,

Review comment:
   nit: seems misaligned.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+// that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.topologyName(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are named topologies but some are empty, this indicates a 
bug in user code
+if (hasNamedTopologies()) {
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   I kept getting myself confused by the `NoNonGlobal` haha (got me for the 
first time reviewing this, and then again for the third pass) :P As a hindsight 
maybe we should invent the term "local topology" at the first place.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##
@@ -93,24 +95,25 @@ public void init(final ProcessorCont

[GitHub] [kafka] jolshan commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-27 Thread GitBox


jolshan commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r677854946



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1367,12 +1367,25 @@ class ReplicaManager(val config: KafkaConfig,
   val currentLeaderEpoch = partition.getLeaderEpoch
   val requestLeaderEpoch = partitionState.leaderEpoch
   val requestTopicId = topicIdFromRequest(topicPartition.topic)
+  val logTopicId = partition.topicId
 
-  if (!hasConsistentTopicId(requestTopicId, partition.topicId)) {
-stateChangeLogger.error(s"Topic ID in memory: 
${partition.topicId.get} does not" +
+  // When running a ZK controller and upgrading to topic IDs we 
may receive a request with leader epoch
+  // that is equal to the current leader epoch. In this case, we 
want to assign topic ID to the log.
+  def isUpgradingToTopicIdWithExistingLog: Boolean = {
+requestLeaderEpoch == currentLeaderEpoch &&

Review comment:
   Ah yeah. That would work. I felt it was a bit odd too, but didn't think 
of this alternative. This placement is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-27 Thread GitBox


hachikuji commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r677853531



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1367,12 +1367,25 @@ class ReplicaManager(val config: KafkaConfig,
   val currentLeaderEpoch = partition.getLeaderEpoch
   val requestLeaderEpoch = partitionState.leaderEpoch
   val requestTopicId = topicIdFromRequest(topicPartition.topic)
+  val logTopicId = partition.topicId
 
-  if (!hasConsistentTopicId(requestTopicId, partition.topicId)) {
-stateChangeLogger.error(s"Topic ID in memory: 
${partition.topicId.get} does not" +
+  // When running a ZK controller and upgrading to topic IDs we 
may receive a request with leader epoch
+  // that is equal to the current leader epoch. In this case, we 
want to assign topic ID to the log.
+  def isUpgradingToTopicIdWithExistingLog: Boolean = {
+requestLeaderEpoch == currentLeaderEpoch &&

Review comment:
   Might be just me, but I find it a little awkward to see the epoch check 
nested here given the other checks below. Would it be reasonable instead to 
move this to after the inequality checks?
   ```scala
   else if (requestLeaderEpoch < currentLeaderEpoch) {
   ...
   } else {
 val error = requestTopicId match {
   case Some(topicId) if logTopicId.isEmpty => 
 // The controller may send LeaderAndIsr to update topicId without 
bumping the epoch
 log.assignTopicId(topicId)
 stateChangeLogger.info("Updating topicId for $log to $topicId from 
LeaderAndIsr request")
 Errors.NONE
   case _ =>
 stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
   s"controller $controllerId with correlation id $correlationId " +
   s"epoch $controllerEpoch for partition $topicPartition since its 
associated " +
   s"leader epoch $requestLeaderEpoch matches the current leader epoch")
Errors.STALE_CONTROLLER_EPOCH
 }
 responseMap.put(topicPartition, error)
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-27 Thread GitBox


hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677847584



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
 maxBatchSize
   )
 
-  partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+  var breakIteration = false

Review comment:
   I tried to consolidate this logic into a single loop, which I think is 
what you are suggesting. Let me know if the latest commit is what you had in 
mind.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13139) Empty response after requesting to restart a connector without the tasks results in NPE

2021-07-27 Thread Kalpesh Patel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388332#comment-17388332
 ] 

Kalpesh Patel commented on KAFKA-13139:
---

Thanks a lot [~rhauch] and [~kkonstantine] for taking care of the fix and 
unblocking the release.

> Empty response after requesting to restart a connector without the tasks 
> results in NPE
> ---
>
> Key: KAFKA-13139
> URL: https://issues.apache.org/jira/browse/KAFKA-13139
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 3.0.0
>
>
> After https://issues.apache.org/jira/browse/KAFKA-4793 a response to restart 
> only the connector (without any tasks) returns OK with an empty body. 
> As system test runs revealed, this causes an NPE in 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L135]
> We should return 204 (NO_CONTENT) instead. 
> This is a regression from previous behavior, therefore the ticket is marked 
> as a blocker candidate for 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-07-27 Thread Kai Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388324#comment-17388324
 ] 

Kai Huang commented on KAFKA-12713:
---

Twitter would be interested to pursue this work to debug latency issues. 
[~mingaliu], I wonder do you mind if I take over this JIRA ticket and 
[KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency]?

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13141:
---

Assignee: Rajini Sivaram  (was: Jason Gustafson)

> Leader should not update follower fetch offset if diverging epoch is present
> 
>
> Key: KAFKA-13141
> URL: https://issues.apache.org/jira/browse/KAFKA-13141
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Jason Gustafson
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol 
> instead of using the old OffsetsForLeaderEpoch API. When truncation is 
> detected, we return a `divergingEpoch` field in the Fetch response, but we do 
> not set an error code. The sender is expected to check if the diverging epoch 
> is present and truncate accordingly.
> All of this works correctly in the fetcher implementation, but the problem is 
> that the logic to update the follower fetch position on the leader does not 
> take into account the diverging epoch present in the response. This means the 
> fetch offsets can be updated incorrectly, which can lead to either log 
> divergence or the loss of committed data.
> For example, we hit the following case with 3 replicas. Leader 1 is elected 
> in epoch 1 with an end offset of 100. The followers are at offset 101
> Broker 1: (Leader) Epoch 1 from offset 100
> Broker 2: (Follower) Epoch 1 from offset 101
> Broker 3: (Follower) Epoch 1 from offset 101
> Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the 
> divergence and returns a diverging epoch in the fetch state. Nevertheless, 
> the fetch positions for both followers are updated to 101 and the high 
> watermark is advanced.
> After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a 
> network partition of some kind and was kicked from the ISR. This caused 
> broker 2 to get elected, which resulted in the following state at the start 
> of epoch 2.
> Broker 1: (Follower) Epoch 2 from offset 101
> Broker 2: (Leader) Epoch 2 from offset 100
> Broker 3: (Follower) Epoch 2 from offset 100
> Broker 2 was then able to write a new entry at offset 100 and the old record 
> which may have been exposed to consumers was deleted by broker 1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


niket-goel commented on a change in pull request #11135:
URL: https://github.com/apache/kafka/pull/11135#discussion_r677821728



##
File path: clients/src/main/resources/common/message/MetadataRequest.json
##
@@ -16,9 +16,9 @@
 {
   "apiKey": 3,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
-  "validVersions": "0-11",
+  "validVersions": "0-12",

Review comment:
   Makes sense. Will remove the version bump.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11134: KAFKA-12851: Fix Raft partition simulation

2021-07-27 Thread GitBox


hachikuji commented on a change in pull request #11134:
URL: https://github.com/apache/kafka/pull/11134#discussion_r677821521



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -171,6 +171,12 @@ private boolean updateHighWatermark() {
 || (highWatermarkUpdateOffset == 
currentHighWatermarkMetadata.offset &&
 
!highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata)))
 {
 highWatermark = highWatermarkUpdateOpt;
+log.debug(

Review comment:
   Do you think this level of detail is more suitable for trace? We have 
debug logging for high watermark advances in `KafkaRaftClient`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


niket-goel commented on a change in pull request #11135:
URL: https://github.com/apache/kafka/pull/11135#discussion_r677820374



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -151,6 +150,11 @@ class ControllerApis(val requestChannel: RequestChannel,
 handleRaftRequest(request, response => new 
FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
   }
 
+  // This API is not wired in yet as the controller does not implement the
+  // metadata completely.
+  // Leaving the code in to be completed and wired in the future
+  //
+  // See https://issues.apache.org/jira/browse/KAFKA-13143 for details

Review comment:
   Will remove the code. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


hachikuji commented on a change in pull request #11135:
URL: https://github.com/apache/kafka/pull/11135#discussion_r677819183



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -151,6 +150,11 @@ class ControllerApis(val requestChannel: RequestChannel,
 handleRaftRequest(request, response => new 
FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
   }
 
+  // This API is not wired in yet as the controller does not implement the
+  // metadata completely.
+  // Leaving the code in to be completed and wired in the future
+  //
+  // See https://issues.apache.org/jira/browse/KAFKA-13143 for details

Review comment:
   My preference would be to remove the handler completely. We can always 
bring it back in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


hachikuji commented on a change in pull request #11135:
URL: https://github.com/apache/kafka/pull/11135#discussion_r677818654



##
File path: clients/src/main/resources/common/message/MetadataRequest.json
##
@@ -16,9 +16,9 @@
 {
   "apiKey": 3,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
-  "validVersions": "0-11",
+  "validVersions": "0-12",

Review comment:
   We shouldn't need to bump the version here if all we're changing is 
`listeners`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel commented on a change in pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


niket-goel commented on a change in pull request #11135:
URL: https://github.com/apache/kafka/pull/11135#discussion_r677815515



##
File path: clients/src/main/resources/common/message/MetadataRequest.json
##
@@ -16,9 +16,9 @@
 {
   "apiKey": 3,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
-  "validVersions": "0-11",
+  "validVersions": "0-12",

Review comment:
   I am not sure if just modifying the valid versions bumps the version 
number (or if we want to bump the version at all)

##
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##
@@ -145,6 +145,14 @@ class ControllerApisTest {
 authorizer
   }
 
+  @Test
+  def testHanldleMetadata(): Unit = {
+val caught = assertThrows(classOf[ApiException], () => 
createControllerApis(
+  None, new MockController.Builder().build()).

Review comment:
   I think I might have found an NPE. createControllerApis allows for 
Option[authorizer], however fails with an NPE if None is passed as an 
authorizer. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel opened a new pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-27 Thread GitBox


niket-goel opened a new pull request #11135:
URL: https://github.com/apache/kafka/pull/11135


   This PR is WIP and the test added does not work yet.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-07-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388304#comment-17388304
 ] 

A. Sophie Blee-Goldman commented on KAFKA-8295:
---

I was just re-reading the wiki page on the Merge Operator, and now I wonder if 
it may not be _as_ helpful as I originally thought – but probably still can 
offer some improvement. Here's my take, let me know what you think.

Regardless of whether a custom MergeOperator suffers from the same performance 
impact of crossing the jni, I would bet that use cases such as list-append 
would still be more performant (since reading out an entire list, appending to 
it, and then writing the entire thing back is a lot of I/O). There are also the 
built-in, native MergeOperators that wouldn't need to cross the jni such as the 
UInt64AddOperator as you point out. So there are definitely cases where a 
MergeOperator would still outperform a RDW sequence.

The thing I didn't fully appreciate before (but seems kind of obvious now that 
I think of it lol) is that the merge() call doesn't actually return the current 
value, either before or after the merge. So if we have to know this value in 
addition to updating it, we need to do a get(), and using merge()  instead of 
RMW is only saving us the cost of `put(full_merged_value) - 
put(single_update_value)` – which means for constant-size values, like the 
unint64 unfortunately, there's pretty much no savings at all. So we don't even 
need to worry about whether/how to handle the fact that this is now a 
ValueAndTimestamp instead of a plain Value, ie a Long in the case of count(), 
because I don't think there's likely to be any performance improvement there.

I didn't realize that at the time of filing this ticket, so maybe we should 
look past the current title of this ticket. This still leaves some cases that 
could potentially benefit from even a custom MergeOperator, such as list-append 
or any other where the difference in size between the full_merged_value and the 
single_update_value is very large. So it could be worth doing a POC of 
something like this and benchmarking that for a KIP.  But tbh, having seen how 
messy it is to add new operators to the StateStore interface at the moment, I 
think we should probably try to avoid doing so unless there's good motivation 
and a clear benefit. In this case, while there may be a benefit, I'm not sure 
there's a good motivation to do so since no user has requested this feature 
yet. Of course that could just be because they aren't aware of the possibility, 
so how about this: we update the title of this ticket to describe this possible 
new feature and then see if any users chime in here or vote on the ticket. If 
we gauge real user interest then it makes more sense to put time into doing 
this. WDYT?

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13139) Empty response after requesting to restart a connector without the tasks results in NPE

2021-07-27 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-13139.

Resolution: Fixed

> Empty response after requesting to restart a connector without the tasks 
> results in NPE
> ---
>
> Key: KAFKA-13139
> URL: https://issues.apache.org/jira/browse/KAFKA-13139
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 3.0.0
>
>
> After https://issues.apache.org/jira/browse/KAFKA-4793 a response to restart 
> only the connector (without any tasks) returns OK with an empty body. 
> As system test runs revealed, this causes an NPE in 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L135]
> We should return 204 (NO_CONTENT) instead. 
> This is a regression from previous behavior, therefore the ticket is marked 
> as a blocker candidate for 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine merged pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE

2021-07-27 Thread GitBox


kkonstantine merged pull request #11132:
URL: https://github.com/apache/kafka/pull/11132


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-27 Thread GitBox


guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r677786276



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
 }
 }
 
+public void setTopologyName(final String namedTopology) {

Review comment:
   I see.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13143:
---

Assignee: Niket Goel  (was: Jose Armando Garcia Sancio)

> Disable Metadata endpoint for KRaft controller
> --
>
> Key: KAFKA-13143
> URL: https://issues.apache.org/jira/browse/KAFKA-13143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The controller currently implements Metadata incompletely. Specifically, it 
> does not return the metadata for any topics in the cluster. This may tend to 
> cause confusion to users. For example, if someone used the controller 
> endpoint by mistake in `kafka-topics.sh --list`, then they would see no 
> topics in the cluster, which would be surprising. It would be better for 3.0 
> to disable Metadata on the controller since we currently expect clients to 
> connect through brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13143:

Issue Type: Bug  (was: Improvement)

> Disable Metadata endpoint for KRaft controller
> --
>
> Key: KAFKA-13143
> URL: https://issues.apache.org/jira/browse/KAFKA-13143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The controller currently implements Metadata incompletely. Specifically, it 
> does not return the metadata for any topics in the cluster. This may tend to 
> cause confusion to users. For example, if someone used the controller 
> endpoint by mistake in `kafka-topics.sh --list`, then they would see no 
> topics in the cluster, which would be surprising. It would be better for 3.0 
> to disable Metadata on the controller since we currently expect clients to 
> connect through brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-27 Thread GitBox


guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r677785678



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map> pro
 return Collections.unmodifiableMap(globalStateStores);
 }
 
-public Set allStateStoreName() {
+public Set allStateStoreNames() {
 Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
 final Set allNames = new HashSet<>(stateFactories.keySet());
 allNames.addAll(globalStateStores.keySet());
 return Collections.unmodifiableSet(allNames);
 }
 
+public boolean hasStore(final String name) {
+return stateFactories.containsKey(name) || 
globalStateStores.containsKey(name);
+}
+
+public boolean hasPersistentStores() {

Review comment:
   I'm not against this idea, just wondering what's the rationale behind it 
:) I'm happy with what you said.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-27 Thread GitBox


guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r677784899



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
 }
 }
 
+public void setTopologyName(final String namedTopology) {
+Objects.requireNonNull(namedTopology, "named topology can't be null");
+if (this.namedTopology != null) {
+log.error("Tried to reset the namedTopology to {} but it was 
already set to {}", namedTopology, this.namedTopology);
+throw new IllegalStateException("NamedTopology has already been 
set to " + this.namedTopology);
+}
+this.namedTopology = namedTopology;
+}
+
 // public for testing only
-public synchronized final InternalTopologyBuilder setApplicationId(final 
String applicationId) {

Review comment:
   Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-27 Thread GitBox


guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r677784769



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   Yeah what I meant is that, if we can ever reach this condition then it 
seems the extra check is redundant. I will check the current logic again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE

2021-07-27 Thread GitBox


kkonstantine commented on pull request #11132:
URL: https://github.com/apache/kafka/pull/11132#issuecomment-887820306


   A single, non-relevant, test failed on the second build: 
   `Build / JDK 8 and Scala 2.12 / 
kafka.server.DelegationTokenRequestsTest.testDelegationTokenRequests()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE

2021-07-27 Thread GitBox


kkonstantine commented on pull request #11132:
URL: https://github.com/apache/kafka/pull/11132#issuecomment-887819548


   Thank you both. Merging to trunk and 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-27 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13143:
---

 Summary: Disable Metadata endpoint for KRaft controller
 Key: KAFKA-13143
 URL: https://issues.apache.org/jira/browse/KAFKA-13143
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


The controller currently implements Metadata incompletely. Specifically, it 
does not return the metadata for any topics in the cluster. This may tend to 
cause confusion to users. For example, if someone used the controller endpoint 
by mistake in `kafka-topics.sh --list`, then they would see no topics in the 
cluster, which would be surprising. It would be better for 3.0 to disable 
Metadata on the controller since we currently expect clients to connect through 
brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13142) KRaft controller does not validate dynamic configs

2021-07-27 Thread Ryan Dielhenn (Jira)
Ryan Dielhenn created KAFKA-13142:
-

 Summary: KRaft controller does not validate dynamic configs
 Key: KAFKA-13142
 URL: https://issues.apache.org/jira/browse/KAFKA-13142
 Project: Kafka
  Issue Type: Task
  Components: kraft
Affects Versions: 3.0.0
Reporter: Ryan Dielhenn
Assignee: Ryan Dielhenn


The KRaft controller is not currently validating dynamic configs. To ensure 
that KRaft clusters are easily upgradable it would be a good idea to validate 
dynamic configs in the first release of KRaft so that invalid dynamic configs 
are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13141) Leader should not update follower fetch offset if diverging epoch is present

2021-07-27 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13141:
---

 Summary: Leader should not update follower fetch offset if 
diverging epoch is present
 Key: KAFKA-13141
 URL: https://issues.apache.org/jira/browse/KAFKA-13141
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.1, 2.8.0
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.0.0, 2.7.2, 2.8.1


In 2.7, we began doing fetcher truncation piggybacked on the Fetch protocol 
instead of using the old OffsetsForLeaderEpoch API. When truncation is 
detected, we return a `divergingEpoch` field in the Fetch response, but we do 
not set an error code. The sender is expected to check if the diverging epoch 
is present and truncate accordingly.

All of this works correctly in the fetcher implementation, but the problem is 
that the logic to update the follower fetch position on the leader does not 
take into account the diverging epoch present in the response. This means the 
fetch offsets can be updated incorrectly, which can lead to either log 
divergence or the loss of committed data.

For example, we hit the following case with 3 replicas. Leader 1 is elected in 
epoch 1 with an end offset of 100. The followers are at offset 101

Broker 1: (Leader) Epoch 1 from offset 100
Broker 2: (Follower) Epoch 1 from offset 101
Broker 3: (Follower) Epoch 1 from offset 101

Broker 1 receives fetches from 2 and 3 at offset 101. The leader detects the 
divergence and returns a diverging epoch in the fetch state. Nevertheless, the 
fetch positions for both followers are updated to 101 and the high watermark is 
advanced.

After brokers 2 and 3 had truncated to offset 100, broker 1 experienced a 
network partition of some kind and was kicked from the ISR. This caused broker 
2 to get elected, which resulted in the following state at the start of epoch 2.

Broker 1: (Follower) Epoch 2 from offset 101
Broker 2: (Leader) Epoch 2 from offset 100
Broker 3: (Follower) Epoch 2 from offset 100

Broker 2 was then able to write a new entry at offset 100 and the old record 
which may have been exposed to consumers was deleted by broker 1.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-07-27 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388235#comment-17388235
 ] 

Sagar Rao edited comment on KAFKA-8295 at 7/27/21, 6:15 PM:


[~ableegoldman].. wanted to know, if we can have a KIP for merge() operator in 
the KV store irrespective of using merge operator from Rocksdb or not(option 1 
from above) ?

I will still be doing some benchmarks on it..

That allows users to use the store for counter specific use cases. WDYT?


was (Author: sagarrao):
[~ableegoldman].. wanted to know, if we can have a KIP for merge() operator in 
the KV store irrespective of using merge operator from Rocksdb or not(option 1 
from above) ? That allows users to use the store for counter specific use 
cases. WDYT?

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-07-27 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388235#comment-17388235
 ] 

Sagar Rao commented on KAFKA-8295:
--

[~ableegoldman].. wanted to know, if we can have a KIP for merge() operator in 
the KV store irrespective of using merge operator from Rocksdb or not(option 1 
from above) ? That allows users to use the store for counter specific use 
cases. WDYT?

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13095) TransactionsTest is failing in kraft mode

2021-07-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13095:
---

Assignee: Jason Gustafson  (was: David Arthur)

> TransactionsTest is failing in kraft mode
> -
>
> Key: KAFKA-13095
> URL: https://issues.apache.org/jira/browse/KAFKA-13095
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-27 Thread GitBox


guozhangwang commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677680568



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
 maxBatchSize
   )
 
-  partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+  var breakIteration = false

Review comment:
   In that case, I feel it may actually get cleaner to inline 
`collectExpiredTransactionalIds` into the caller, and hence to get just one 
while loop / flag, we can still distinguish the case where the log is offline 
and hence we should not proceed v.s. the batch is full, we should write once 
and proceed. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #11076: KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw…

2021-07-27 Thread GitBox


vamossagar12 commented on pull request #11076:
URL: https://github.com/apache/kafka/pull/11076#issuecomment-887710210


   @ableegoldman .. sorry to bother you again on this one.. but could you plz 
review whenever you get the chance..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-07-27 Thread GitBox


vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-887703246


   hey @guozhangwang / @cadonna .. sorry for being nosey here but did you get a 
chance to look at these numbers? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-27 Thread GitBox


hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677652114



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
 maxBatchSize
   )
 
-  partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+  var breakIteration = false

Review comment:
   We've changed the logic to add check the log configuration to get to the 
batch size. If the partition is offline, then `ReplicaManager.getLogConfig` 
will return `None`. I have a test case which shows this path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-27 Thread GitBox


guozhangwang commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r677649333



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -161,22 +162,26 @@ class TransactionStateManager(brokerId: Int,
 maxBatchSize
   )
 
-  partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+  var breakIteration = false

Review comment:
   I think I may miss something here, could you explain why we'd need both 
the caller/callee doing this while loop with the break iteration flag? It seems 
to me that we have multiple reasons to break early: 1) we've reached the record 
limit, 2) the partition is already offline, and we want to treat them 
differently and hence the both while loops, right?
   
   But I cannot see exactly where we may have case 2)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

2021-07-27 Thread GitBox


jsancio commented on a change in pull request #6:
URL: https://github.com/apache/kafka/pull/6#discussion_r677635360



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -783,6 +798,11 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
 });
 } else if (curClaimEpoch != -1) {
 appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () 
-> {
+if (this != metaLogListener) {

Review comment:
   Good idea. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #11134: KAFKA-12851: Fix Raft partition simulation

2021-07-27 Thread GitBox


jsancio opened a new pull request #11134:
URL: https://github.com/apache/kafka/pull/11134


   Instead of waiting for a high-watermark of 20 after the partition, the
   test should wait for the high-watermark to reach an offset greater than
   the largest log end offset at the time of the partition. Only that offset
   is guarantee to be reached as the high-watermark by the new majority.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11133: KAFKA-13140: KRaft brokers do not expose kafka.controller metrics

2021-07-27 Thread GitBox


jsancio commented on a change in pull request #11133:
URL: https://github.com/apache/kafka/pull/11133#discussion_r677612215



##
File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala
##
@@ -100,6 +101,9 @@ class KafkaRaftServer(
   controllerQuorumVotersFuture
 ))
   } else {
+// we need to register the various kafka.controller metrics
+// for backwards compatibility with the ZooKeeper-based case
+new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())

Review comment:
   This works because the constructor of `QuorumControllerMetrics` 
registers those metrics against the Yammer registry. How about having a static 
method in `QuorumControllerMetrics` that does this and doesn't use guages since 
they are not needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors

2021-07-27 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny reassigned KAFKA-12793:
-

Assignee: KahnCheny

> Client-side Circuit Breaker for Partition Write Errors
> --
>
> Key: KAFKA-12793
> URL: https://issues.apache.org/jira/browse/KAFKA-12793
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: KahnCheny
>Assignee: KahnCheny
>Priority: Major
> Attachments: KAFKA-12793.patch
>
>
> When Kafka is used to build data pipeline in mission critical business 
> scenarios, availability and throughput are the most important operational 
> goals that need to be maintained in presence of transient or permanent local 
> failure. One typical situation that requires Ops intervention is disk 
> failure, some partitions have long write latency caused by extremely high 
> disk utilization; since all partitions share the same buffer under the 
> current producer thread model, the buffer will be filled up quickly and 
> eventually the good partitions are impacted as well. The cluster level 
> success rate and timeout ratio will degrade until the local infrastructure 
> issue is resolved.
> One way to mitigate this issue is to add client side mechanism to short 
> circuit problematic partitions during transient failure. Similar approach is 
> applied in other distributed systems and RPC frameworks.
> We propose to add a configuration driven circuit breaking mechanism that 
> allows Kafka client to ‘mute’ partitions when certain condition is met. The 
> mechanism adds callbacks in Sender class workflow that allows to filtering 
> partitions based on certain policy.
> The client can choose proper implementation that fits a special failure 
> scenario, Client-side custom implementation of Partitioner and 
> ProducerInterceptor
> * Customize the implementation of ProducerInterceptor, and choose the 
> strategy to mute partitions.
> * Customize the implementation of Partitioner, and choose the strategy to 
> filtering partitions.
> Muting partitions have impact when the topic contains keyed message as 
> messages will be written to more than one partitions during period of 
> recovery. We believe this can be an explicit trade-off the application makes 
> between availability and message ordering.
> KIP-693: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes

2021-07-27 Thread GitBox


jlprat commented on pull request #10784:
URL: https://github.com/apache/kafka/pull/10784#issuecomment-887638986


   Something went wrong during the build, can someone re-trigger the build in 
Jenkins?
   
   Failure was:
   > [2021-07-27T14:23:53.802Z] FAILURE: Build failed with an exception.
   [2021-07-27T14:23:53.802Z] 
   [2021-07-27T14:23:53.802Z] * What went wrong:
   [2021-07-27T14:23:53.802Z] Execution failed for task ':storage:unitTest'.
   [2021-07-27T14:23:53.802Z] > Process 'Gradle Test Executor 68' finished with 
non-zero exit value 1
   [2021-07-27T14:23:53.802Z]   This problem might be caused by incorrect test 
process configuration.
   [2021-07-27T14:23:53.802Z]   Please refer to the test execution section in 
the User Manual at 
https://docs.gradle.org/7.1.1/userguide/java_testing.html#sec:test_execution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue edited a comment on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse

2021-07-27 Thread GitBox


kirktrue edited a comment on pull request #10980:
URL: https://github.com/apache/kafka/pull/10980#issuecomment-887635572


   > Why is `NetworkClient#send` throwing an exception? It shouldn't be doing 
that, right? Can you explain more about the problem that this PR fixes?
   
   Per [the original issue](https://issues.apache.org/jira/browse/KAFKA-12989) 
the `MockClient` used for testing allows for fault injection via the 
`RequestMatcher`. If the test sets up the condition where the request _doesn't_ 
match some condition, the `MockClient.send` method is supposed to throw an 
`IllegalStateException`.
   
   That change seemed straightforward except that this now caused problems in 
`KafkaAdminClient`. Because it's not expecting any errors, this exception 
causes the thread in `KafkaAdminClient.sendEligibleCalls` that is servicing 
requests to die, hence my addition of the `try`/`catch` wrapper.
   
   That said, I'm not 100% confident that this change is the right way to 
handle things. Please advise.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse

2021-07-27 Thread GitBox


kirktrue commented on pull request #10980:
URL: https://github.com/apache/kafka/pull/10980#issuecomment-887635572


   > Why is `NetworkClient#send` throwing an exception? It shouldn't be doing 
that, right? Can you explain more about the problem that this PR fixes?
   
   Per [the original issue](https://issues.apache.org/jira/browse/KAFKA-12989) 
the `MockClient` used for testing allows for fault injection via the 
`RequestMatcher`. If the test sets up the condition where the request _doesn't_ 
match some condition, the `MockClient.send` method throws an 
`IllegalStateException`. Because it's not expecting any errors, this exception 
causes the thread in `KafkaAdminClient` that is servicing requests to die, 
hence the `try`/`catch` wrapper.
   
   That said, I'm not 100% confident that this change is the right way to 
handle things.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #10951: KAFKA-12841: NPE from the provided metadata in client callback in case of ApiException

2021-07-27 Thread GitBox


kirktrue commented on pull request #10951:
URL: https://github.com/apache/kafka/pull/10951#issuecomment-887627439


   Thanks @cmccabe for looking at this!
   
   > If I understand correctly, the PR makes up a fake partition so that 
`TopicPartition` can be non-null in the callback. I don't think that this is 
the right thing to do.
   
   Yeah, I'm not crazy about it either. There was some "prior art" in the 
codebase where the same thing was done (`ProducerInterceptors.onSendError`) and 
the existence of `RecordMetadata.UNKNOWN_PARTITION` suggested that it might be 
permissible for these kinds of cases.
   
   > Why not change the JavaDoc for the callback to indicate that 
`TopicPartition` can be null if the method fails before it gets assigned?
   
   My interpretation was that was akin to changing the interface of the 
`Callback` API and could cause some `NullPointerException` cases in users' code.
   
   I'm happy to make the change to the JavaDoc, as suggested.
   
   Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10310) Kafka Raft Snapshot

2021-07-27 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-10310:
--

Assignee: Jose Armando Garcia Sancio  (was: loboxu)

> Kafka Raft Snapshot
> ---
>
> Key: KAFKA-10310
> URL: https://issues.apache.org/jira/browse/KAFKA-10310
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Tracking issue for [KIP-630: Kafka Raft 
> Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] douglasawh commented on pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.

2021-07-27 Thread GitBox


douglasawh commented on pull request #8656:
URL: https://github.com/apache/kafka/pull/8656#issuecomment-887607101


   Any thoughts on when this might get merged? I see that checks have failed, 
so I'm guessing the fix is not ideal, but I haven't looked at the code in depth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #11133: KAFKA-13140: KRaft brokers do not expose kafka.controller metrics

2021-07-27 Thread GitBox


rondagostino opened a new pull request #11133:
URL: https://github.com/apache/kafka/pull/11133


   Several controller metrics are exposed on every broker in a ZooKeeper-based 
(i.e. non-KRaft) cluster regardless of whether the broker is the active 
controller or not, but these metrics are not exposed on KRaft nodes that have 
process.roles=broker (i.e. KRaft nodes that do not implement the controller 
role). For backwards compatibility, KRaft nodes that are just brokers should 
expose these metrics with values all equal to 0: just like ZooKeeper-based 
brokers do when they are not the active controller.  This patch adds these 
metrics and an associated test case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13140) KRaft brokers do not expose kafka.controller metrics, breaking backwards compatibility

2021-07-27 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-13140:
-

 Summary: KRaft brokers do not expose kafka.controller metrics, 
breaking backwards compatibility
 Key: KAFKA-13140
 URL: https://issues.apache.org/jira/browse/KAFKA-13140
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 2.8.0, 3.0.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 3.1.0


The following controller metrics are exposed on every broker in a 
ZooKeeper-based (i.e. non-KRaft) cluster regardless of whether the broker is 
the active controller or not, but these metrics are not exposed on KRaft nodes 
that have process.roles=broker (i.e. KRaft nodes that do not implement the 
controller role).  For backwards compatibility, KRaft nodes that are just 
brokers should expose these metrics with values all equal to 0: just like 
ZooKeeper-based brokers do when they are not the active controller.

kafka.controller:type=KafkaController,name=ActiveControllerCount
kafka.controller:type=KafkaController,name=GlobalTopicCount
kafka.controller:type=KafkaController,name=GlobalPartitionCount
kafka.controller:type=KafkaController,name=OfflinePartitionsCount
kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #11132: KAFKA-13139: Empty response after requesting to restart a connector without the tasks results in NPE

2021-07-27 Thread GitBox


rhauch commented on a change in pull request #11132:
URL: https://github.com/apache/kafka/pull/11132#discussion_r677530176



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -269,7 +269,7 @@ public Response restartConnector(final 
@PathParam("connector") String connector,
 FutureCallback cb = new FutureCallback<>();
 herder.restartConnector(connector, cb);
 completeOrForwardRequest(cb, forwardingPath, "POST", headers, 
null, forward);
-return Response.ok().build();
+return Response.noContent().build();

Review comment:
   I've corrected the KIP and sent an email describing this minor 
correction to the vote thread for the KIP.
   
   I've also added [a comment on 
KAFKA-13139](https://issues.apache.org/jira/browse/KAFKA-13139?focusedCommentId=17388116&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17388116)
 that describes the root cause and the KIP correction.
   
   Thanks, @kkonstantine and @kpatelatwork.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13139) Empty response after requesting to restart a connector without the tasks results in NPE

2021-07-27 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388116#comment-17388116
 ] 

Randall Hauch commented on KAFKA-13139:
---

Just to clarify what appears to have happened.

As [~kpatelatwork] [mentions in a comment on the 
PR|https://github.com/apache/kafka/pull/11132], the behavior of the Connect 
restart API in AK 2.8 and earlier was always to return "204 NO CONTENT", not 
"200 OK" as mentioned in 
[KIP-745|https://cwiki.apache.org/confluence/display/KAFKA/KIP-745%3A+Connect+API+to+restart+connector+and+tasks].
 Although the code used `Response.ok().build()`, the `RestClient` always 
processed the absence of a response body as `204 NO CONTENT`. So, to maintain 
the actual AK 2.x behavior in this branch of the code, we should instead return 
`204 NO CONTENT`.

I've corrected the KIP to reflect this older actual behavior of returning "204 
NO CONTENT". It was a minor but necessary correction.

Note that we have *not* changed the KIP or the behavior of returning "202 
ACCEPTED" when `includeTasks=true` and/or `failedOnly=true`. These cases 
correspond to the new behavior added in KIP-745.

> Empty response after requesting to restart a connector without the tasks 
> results in NPE
> ---
>
> Key: KAFKA-13139
> URL: https://issues.apache.org/jira/browse/KAFKA-13139
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 3.0.0
>
>
> After https://issues.apache.org/jira/browse/KAFKA-4793 a response to restart 
> only the connector (without any tasks) returns OK with an empty body. 
> As system test runs revealed, this causes an NPE in 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L135]
> We should return 204 (NO_CONTENT) instead. 
> This is a regression from previous behavior, therefore the ticket is marked 
> as a blocker candidate for 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes

2021-07-27 Thread GitBox


jlprat commented on pull request #10784:
URL: https://github.com/apache/kafka/pull/10784#issuecomment-887529961


   As new code has been added I need to re-run scalafmt for those. Pushing 
those changes in a second.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11131: KAFKA-13137: KRaft Controller Metric MBean names incorrectly quoted

2021-07-27 Thread GitBox


rondagostino commented on pull request #11131:
URL: https://github.com/apache/kafka/pull/11131#issuecomment-887525250


   Thanks for the reviews @hachikuji and @showuon.  The test is revamped so 
that the method is `private static void assertExpectedMetrics(Set 
expectedMetricNames, String expectedType)` as suggested, and all of the 
assertion messages now make sense if there is a failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat edited a comment on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes

2021-07-27 Thread GitBox


jlprat edited a comment on pull request #10784:
URL: https://github.com/apache/kafka/pull/10784#issuecomment-887524211


   Thanks for the review @vvcephei !
   Actually, the spotless task executes the scalafmt in this gradle config, 
this means running `./gradlew spotlessScalaCheck` will analyze all Scala files 
and complain about violations in the code. Then running `./gradlew 
:spotlessScalaApply` will fix any formatting discrepancies.
   Alternatively, one can configure IntelliJ or VS Code to run scalafmt and it 
will pick the existing configuration in the kafka repo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes

2021-07-27 Thread GitBox


jlprat commented on pull request #10784:
URL: https://github.com/apache/kafka/pull/10784#issuecomment-887524211


   Thanks for the review @vvcephei !
   Actually, the spotless task executes the scalafmt in this gradle config, 
this means running `./gradlew spotlessScalaCheck` will analyze all Scala files 
and complain about violations in the code. Then running `./gradlew 
:spotlessScalaApply` will fix any formatting discrepancies.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes

2021-07-27 Thread GitBox


vvcephei commented on pull request #10784:
URL: https://github.com/apache/kafka/pull/10784#issuecomment-887513532


   It looks like there's a new conflict. Hopefully, we can merge soon after you 
fix the conflict this time.
   
   By the way, can you let me know the command you used to apply the format? 
I've been accustomed to using Spotless Scala in this repo; I didn't know about 
Scala fmt until just now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10310) Kafka Raft Snapshot

2021-07-27 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-10310:
--

Assignee: loboxu  (was: Jose Armando Garcia Sancio)

> Kafka Raft Snapshot
> ---
>
> Key: KAFKA-10310
> URL: https://issues.apache.org/jira/browse/KAFKA-10310
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>
> Tracking issue for [KIP-630: Kafka Raft 
> Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-27 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r677296530



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##
@@ -63,6 +65,9 @@
 final KeyValue next = super.makeNext();
 if (next == null) {
 return allDone();
+} else if (rawLastKey == null) {
+return next;

Review comment:
   Added a comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-27 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r677292872



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##
@@ -44,17 +44,19 @@
 this.forward = forward;
 this.toInclusive = toInclusive;
 if (forward) {
-iter.seek(from.get());
-rawLastKey = to.get();
-if (rawLastKey == null) {
-throw new NullPointerException("RocksDBRangeIterator: 
RawLastKey is null for key " + to);
+if (from == null) {
+iter.seekToFirst();
+} else {
+iter.seek(from.get());
 }
+rawLastKey = to == null ? null : to.get();

Review comment:
   Thanks for the comment @showuon , fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-27 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r677285658



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##
@@ -63,17 +65,20 @@
 final KeyValue next = super.makeNext();
 if (next == null) {
 return allDone();
+} else if (rawLastKey == null) {
+return next;
+
 } else {
 if (forward) {
-if (comparator.compare(next.key.get(), rawLastKey) < 0) {
+if (rawLastKey != null && comparator.compare(next.key.get(), 
rawLastKey) < 0) {

Review comment:
   Good catch. Yes this check can be eliminated now that there is a common 
null check in the beginning. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-27 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r677285658



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##
@@ -63,17 +65,20 @@
 final KeyValue next = super.makeNext();
 if (next == null) {
 return allDone();
+} else if (rawLastKey == null) {
+return next;
+
 } else {
 if (forward) {
-if (comparator.compare(next.key.get(), rawLastKey) < 0) {
+if (rawLastKey != null && comparator.compare(next.key.get(), 
rawLastKey) < 0) {

Review comment:
   Good catch. Yes this check can be eliminated now that there is a common 
null check in the beginnin. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust

2021-07-27 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny reassigned KAFKA-13120:
-

Assignee: Reggie Hsu  (was: KahnCheny)

> Flesh out `streams_static_membership_test` to be more robust
> 
>
> Key: KAFKA-13120
> URL: https://issues.apache.org/jira/browse/KAFKA-13120
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Leah Thomas
>Assignee: Reggie Hsu
>Priority: Minor
>  Labels: newbie++
>
> When fixing the `streams_static_membership_test.py` we noticed that the test 
> is pretty bare bones, it creates a streams application but doesn't do much 
> with the streams application, eg has no stateful processing. We should flesh 
> this out a bit to be more realistic and potentially consider testing with EOS 
> as well. The full java test is in `StaticMembershipTestClient`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust

2021-07-27 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny reassigned KAFKA-13120:
-

Assignee: KahnCheny  (was: Reggie Hsu)

> Flesh out `streams_static_membership_test` to be more robust
> 
>
> Key: KAFKA-13120
> URL: https://issues.apache.org/jira/browse/KAFKA-13120
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Leah Thomas
>Assignee: KahnCheny
>Priority: Minor
>  Labels: newbie++
>
> When fixing the `streams_static_membership_test.py` we noticed that the test 
> is pretty bare bones, it creates a streams application but doesn't do much 
> with the streams application, eg has no stateful processing. We should flesh 
> this out a bit to be more realistic and potentially consider testing with EOS 
> as well. The full java test is in `StaticMembershipTestClient`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-27 Thread GitBox


showuon commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r677209624



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##
@@ -63,6 +65,9 @@
 final KeyValue next = super.makeNext();
 if (next == null) {
 return allDone();
+} else if (rawLastKey == null) {
+return next;

Review comment:
   We changed the meaning of `rawLastKey`, need to add some comment for 
`rawLastKey == null` case.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##
@@ -44,17 +44,19 @@
 this.forward = forward;
 this.toInclusive = toInclusive;
 if (forward) {
-iter.seek(from.get());
-rawLastKey = to.get();
-if (rawLastKey == null) {
-throw new NullPointerException("RocksDBRangeIterator: 
RawLastKey is null for key " + to);
+if (from == null) {
+iter.seekToFirst();
+} else {
+iter.seek(from.get());
 }
+rawLastKey = to == null ? null : to.get();

Review comment:
   Should we still need to have `rawLastKey` null check for the `to.get()` 
case? 
   
   Same comments to the other similar places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org