[GitHub] [kafka] showuon commented on a change in pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-06-29 Thread GitBox


showuon commented on a change in pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#discussion_r661098484



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -68,7 +68,7 @@ object ConfigCommand extends Config {
   val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
   val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
-  val ZkSupportedConfigTypes = ConfigType.all
+  val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)

Review comment:
   If you're saying this comment:
   > // Dynamic broker configs can only be updated using the new AdminClient 
once brokers have started
 // so that configs may be fully validated. Prior to starting brokers, 
updates may be performed using
 // ZooKeeper for bootstrapping. This allows all password configs to be 
stored encrypted in ZK,
 // avoiding clear passwords in server.properties. For consistency with 
older versions, quota-related
 // broker configs can still be updated using ZooKeeper at any time.
   
I checked and confirmed that it is saying these 3 configs of "broker" type 
(not "quota" type):
   ```
   val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set(
   DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
   DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
   DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-06-29 Thread GitBox


showuon commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-871132339


   @ijuma , I've updated the PR. Please take a look again. Thank you.
   
   Failed tests are unrelated.
   ```
   Build / JDK 16 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 16 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-06-29 Thread GitBox


showuon commented on a change in pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#discussion_r661161241



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -68,14 +68,13 @@ object ConfigCommand extends Config {
   val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
   val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
-  val ZkSupportedConfigTypes = ConfigType.all
+  val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)
   val DefaultScramIterations = 4096
   // Dynamic broker configs can only be updated using the new AdminClient once 
brokers have started
   // so that configs may be fully validated. Prior to starting brokers, 
updates may be performed using
   // ZooKeeper for bootstrapping. This allows all password configs to be 
stored encrypted in ZK,
   // avoiding clear passwords in server.properties. For consistency with older 
versions, quota-related
-  // broker configs can still be updated using ZooKeeper at any time. 
ConfigCommand will be migrated
-  // to the new AdminClient later for these configs (KIP-248).

Review comment:
   I've checked and confirmed that KIP-248 is rejected now. It won't be 
implemented. So, remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-29 Thread GitBox


mjsax commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661136400



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
 if (beforeMs + afterMs < 0) {
 throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
 }
+
+if (graceMs < 0) {
+throw new IllegalArgumentException("Grace period must not be 
negative.");
+}
+
 this.afterMs = afterMs;
 this.beforeMs = beforeMs;
 this.graceMs = graceMs;
 this.enableSpuriousResultFix = enableSpuriousResultFix;
 }
 
+/**
+ * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving

Review comment:
   `out-of-order` (with `-`)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long 
advanceMs, final long graceMs)
  * 
  * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
  * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+ * Using the method implicitly sets the grace period to zero which means
+ * that out of order records arriving after the window end will be dropped
  *
  * @param size The size of the window
- * @return a new window definition with default maintain duration of 1 day
+ * @return a new window definition with default no grace period. Note that 
this means out of order records arriving after the window end will be dropped
+ * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+ */
+public static TimeWindows ofSizeWithNoGrace(final Duration size) throws 
IllegalArgumentException {
+return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+}
+
+/**
+ * Return a window definition with the given window size, and with the 
advance interval being equal to the window
+ * size.
+ * The time interval represented by the N-th window is: {@code [N * size, 
N * size + size)}.
+ * 
+ * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
+ * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+ * Using the method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd} which means
+ * that out of order records arriving after the window end will be dropped.
+ *
+ * 
+ * Delay is defined as (stream_time - record_timestamp).
+ *
+ * @param size The size of the window. Must be larger than zero
+ * @param afterWindowEnd The grace period to admit out-of-order events to 
a window. Must be non-negative.
+ * @return a TimeWindows object with the specified size and the specified 
grace period
+ * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
+ */
+public static TimeWindows ofSizeAndGrace(final Duration size, final 
Duration afterWindowEnd)
+throws IllegalArgumentException {

Review comment:
   nit: move the previous line?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
 if (beforeMs + afterMs < 0) {
 throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
 }
+
+if (graceMs < 0) {
+throw new IllegalArgumentException("Grace period must not be 
negative.");
+}
+
 this.afterMs = afterMs;
 this.beforeMs = beforeMs;
 this.graceMs = graceMs;
 this.enableSpuriousResultFix = enableSpuriousResultFix;
 }
 
+/**
+ * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
+ * after the window end will be dropped

[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

2021-06-29 Thread GitBox


mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661100306



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -938,6 +938,9 @@
 // Private API used to disable the fix on left/outer joins 
(https://issues.apache.org/jira/browse/KAFKA-10847)
 public static final String 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = 
"__enable.kstreams.outer.join.spurious.results.fix__";
 
+// Private API used to control the emit latency for left/outer join 
results (https://issues.apache.org/jira/browse/KAFKA-10847)
+public static final String 
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = 
"emit.interval.ms.kstreams.outer.join.spurious.results.fix__";

Review comment:
   Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-06-29 Thread GitBox


showuon commented on a change in pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#discussion_r661098484



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -68,7 +68,7 @@ object ConfigCommand extends Config {
   val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
   val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
-  val ZkSupportedConfigTypes = ConfigType.all
+  val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)

Review comment:
   If you're saying this comment:
   > // Dynamic broker configs can only be updated using the new AdminClient 
once brokers have started
 // so that configs may be fully validated. Prior to starting brokers, 
updates may be performed using
 // ZooKeeper for bootstrapping. This allows all password configs to be 
stored encrypted in ZK,
 // avoiding clear passwords in server.properties. For consistency with 
older versions, quota-related
 // broker configs can still be updated using ZooKeeper at any time.
   
I checked and confirmed that is saying these 3 configs of "broker" type 
(not "quota" type):
   ```
   val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set(
   DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
   DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
   DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661073496



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DeleteTopicsResult deleteTopics(final Collection topicNames,
+public DeleteTopicsResult deleteTopics(final TopicCollection topics,
final DeleteTopicsOptions options) {
+DeleteTopicsResult result;
+if (topics instanceof TopicIdCollection)
+result = DeleteTopicsResult.ofTopicIds(new 
HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));

Review comment:
   I can move this cast to the `handle...` call and return a KafkaFuture if 
it makes things cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12660) Do not update offset commit sensor after append failure

2021-06-29 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371766#comment-17371766
 ] 

dengziming commented on KAFKA-12660:


[~kirktrue] Thank you for your attention, but I have already opened a PR for 
this issue: [https://github.com/apache/kafka/pull/10560,] are you interested in 
reviewing it?

> Do not update offset commit sensor after append failure
> ---
>
> Key: KAFKA-12660
> URL: https://issues.apache.org/jira/browse/KAFKA-12660
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>
> In the append callback after writing an offset to the log in 
> `GroupMetadataManager`, It seems wrong to update the offset commit sensor 
> prior to checking for errors: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L394.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661068251



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DeleteTopicsResult deleteTopics(final Collection topicNames,
+public DeleteTopicsResult deleteTopics(final TopicCollection topics,
final DeleteTopicsOptions options) {
+DeleteTopicsResult result;
+if (topics instanceof TopicIdCollection)
+result = DeleteTopicsResult.ofTopicIds(new 
HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));

Review comment:
   Many other admin client calls also follow this pattern with using 
KafkaFutureImpl and copying the result to get KafkaFutures




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661068251



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DeleteTopicsResult deleteTopics(final Collection topicNames,
+public DeleteTopicsResult deleteTopics(final TopicCollection topics,
final DeleteTopicsOptions options) {
+DeleteTopicsResult result;
+if (topics instanceof TopicIdCollection)
+result = DeleteTopicsResult.ofTopicIds(new 
HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));

Review comment:
   All other admin client calls also follow this pattern with using 
KafkaFutureImpl and copying the result to get KafkaFutures




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661067320



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DeleteTopicsResult deleteTopics(final Collection topicNames,
+public DeleteTopicsResult deleteTopics(final TopicCollection topics,
final DeleteTopicsOptions options) {
+DeleteTopicsResult result;
+if (topics instanceof TopicIdCollection)
+result = DeleteTopicsResult.ofTopicIds(new 
HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));

Review comment:
   Hmmm. When I try to remove I get
   
`'ofTopicIds(java.util.Map>)'
 in 'org.apache.kafka.clients.admin.DeleteTopicsResult' cannot be applied to 
'(java.util.Map>)'`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661066009



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private final Map> topicIdFutures;
+private final Map> nameFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult(Map> topicIdFutures, 
Map> nameFutures) {
+if (topicIdFutures != null && nameFutures != null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
can not both be specified.");
+this.topicIdFutures = topicIdFutures;
+this.nameFutures = nameFutures;
 }
 
+protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) {

Review comment:
   Forgive my ignorance but is that not the same thing? Is package private 
just that we can't use in subclasses outside the package? If that's the case 
then sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

2021-06-29 Thread GitBox


showuon commented on pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#issuecomment-871028414


   Failed tests are unrelated. Thank you.
   ```
   Build / JDK 16 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

2021-06-29 Thread GitBox


showuon commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661064592



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -938,6 +938,9 @@
 // Private API used to disable the fix on left/outer joins 
(https://issues.apache.org/jira/browse/KAFKA-10847)
 public static final String 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = 
"__enable.kstreams.outer.join.spurious.results.fix__";
 
+// Private API used to control the emit latency for left/outer join 
results (https://issues.apache.org/jira/browse/KAFKA-10847)
+public static final String 
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = 
"emit.interval.ms.kstreams.outer.join.spurious.results.fix__";

Review comment:
   Should we add underscore in front of the config like the above ones? 
i.e. `__emit.interval.ms.kstreams.outer.join.spurious.results.fix__`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661064384



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DeleteTopicsResult deleteTopics(final Collection topicNames,
+public DeleteTopicsResult deleteTopics(final TopicCollection topics,
final DeleteTopicsOptions options) {
+DeleteTopicsResult result;
+if (topics instanceof TopicIdCollection)
+result = DeleteTopicsResult.ofTopicIds(new 
HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));

Review comment:
   This was an artifact of the old version of the code. We would create a 
copy of the map 
   ```
   return new DeleteTopicsResult(new HashMap<>(topicFutures));
   ```
   I can remove though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661062996



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private final Map> topicIdFutures;
+private final Map> nameFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult(Map> topicIdFutures, 
Map> nameFutures) {
+if (topicIdFutures != null && nameFutures != null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
can not both be specified.");
+this.topicIdFutures = topicIdFutures;
+this.nameFutures = nameFutures;
 }
 
+protected static DeleteTopicsResult ofTopicIds(Map> topicIdFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult(topicIdFutures, 
null);
+return result;
+}
+
+protected static DeleteTopicsResult ofTopicNames(Map> nameFutures) {
+DeleteTopicsResult result = new DeleteTopicsResult(null, nameFutures);

Review comment:
   Yes. This was left over from before and I didn't clean up enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


jolshan commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661062826



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private final Map> topicIdFutures;
+private final Map> nameFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult(Map> topicIdFutures, 
Map> nameFutures) {
+if (topicIdFutures != null && nameFutures != null)

Review comment:
   good point




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

2021-06-29 Thread GitBox


hachikuji commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661058030



##
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A class used to represent a collection of topics. This collection may 
define topics by topic name
+ * or topic ID. Subclassing this class beyond the classes provided here is not 
supported.
+ */
+public abstract class TopicCollection {
+
+private TopicCollection() {}
+
+/**
+ * @return a collection of topics defined by topic ID
+ */
+public static TopicIdCollection ofTopicIds(Collection topics) {
+return new TopicIdCollection(topics);

Review comment:
   Since the collection here comes from the user, maybe we should make a 
copy. Otherwise, the application could mutate it while we have a reference.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-final Map> futures;
+private final Map> topicIdFutures;
+private final Map> nameFutures;
 
-protected DeleteTopicsResult(Map> futures) {
-this.futures = futures;
+protected DeleteTopicsResult(Map> topicIdFutures, 
Map> nameFutures) {
+if (topicIdFutures != null && nameFutures != null)

Review comment:
   This allows both of them to be null?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DeleteTopicsResult deleteTopics(final Collection topicNames,
+public DeleteTopicsResult deleteTopics(final TopicCollection topics,
final DeleteTopicsOptions options) {
+DeleteTopicsResult result;
+if (topics instanceof TopicIdCollection)
+result = DeleteTopicsResult.ofTopicIds(new 
HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));

Review comment:
   Why do we copy the result of `handleDeleteTopicsUsingIds`? Seems like 
that method is already returning a fresh map.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/DeleteTopicsResultTest.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class DeleteTopicsResultTest {
+
+@Test
+public void testDeleteTopicsResult() {

Review comment:
   nit: would it make sense to turn this into two separate tests? One for 
ids and one for names?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public c

[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

2021-06-29 Thread GitBox


guozhangwang commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661055137



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -183,15 +195,27 @@ public void process(final K key, final V1 value) {
 
 @SuppressWarnings("unchecked")
 private void emitNonJoinedOuterRecords(final 
WindowStore, LeftOrRightValue> store) {
+if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - 
joinAfterMs - joinGraceMs) {

Review comment:
   nit: add some explanations on these two conditions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

2021-06-29 Thread GitBox


mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661045444



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -1426,6 +1425,9 @@ private void verifyMaxInFlightRequestPerConnection(final 
Object maxInFlightReque
 public Serde defaultKeySerde() {
 final Object keySerdeConfigSetting = 
get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
 try {
+if (keySerdeConfigSetting ==  null) {
+return null;

Review comment:
   It's been a while, but I am still wondering if we should throw a 
`ConfigException` directly instead of returning `null`? And this would be the 
only place in the code when we throw a `ConfigException` for this case. Below, 
there is some repetitive code that calls `defaultKeySerde` (or 
`defaultValueSerd`) and throws `ConfigException` if `null` is returned.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##
@@ -69,6 +70,9 @@
 Optional.empty()
 );
 } catch (final Exception deserializationException) {
+if (deserializationException instanceof ConfigException) {

Review comment:
   Where does the `ConfigException` come from?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##
@@ -27,20 +31,37 @@
 public class WrappingNullableUtils {
 
 @SuppressWarnings("unchecked")
-private static  Deserializer prepareDeserializer(final 
Deserializer specificDeserializer, final Deserializer 
contextKeyDeserializer, final Deserializer contextValueDeserializer, final 
boolean isKey) {
-Deserializer deserializerToUse = specificDeserializer;
-if (deserializerToUse == null) {
+private static  Deserializer prepareDeserializer(final 
Deserializer specificDeserializer, final ProcessorContext context, final 
boolean isKey, final String name) {
+final Deserializer contextKeyDeserializer = context.keySerde() == 
null ? null : context.keySerde().deserializer();
+final Deserializer contextValueDeserializer = context.valueSerde() 
== null ? null : context.valueSerde().deserializer();
+final Deserializer deserializerToUse;
+
+if (specificDeserializer == null) {
 deserializerToUse = (Deserializer) (isKey ? 
contextKeyDeserializer : contextValueDeserializer);
+} else {
+deserializerToUse = specificDeserializer;
+}
+if (deserializerToUse == null) {
+final String serde = isKey ? "key" : "value";
+throw new ConfigException("Failed to create deserializers. Please 
specify a " + serde + " serde through produced or materialized, or set one 
through StreamsConfig#DEFAULT_" + serde.toUpperCase(Locale.ROOT) + 
"_SERDE_CLASS_CONFIG for node " + name);

Review comment:
   Following the comments from above. If we throw in `context.keySerde()` 
we can avoid this redundant code. Of course, we should only call 
`context.keySerde()` if `specificDeserializer == null` for this case.
   
   Similar below.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -1445,6 +1447,9 @@ public Serde defaultKeySerde() {
 public Serde defaultValueSerde() {
 final Object valueSerdeConfigSetting = 
get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
 try {
+if (valueSerdeConfigSetting == null) {
+return null;

Review comment:
   As above

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer inner) {
 @Override
 public void setIfUnset(final Deserializer defaultKeyDeserializer, 
final Deserializer defaultValueDeserializer) {

Review comment:
   Could we pass the context instead of both deserializers, and simplify to 
(relying on the context to throw if necessary):
   ```
   if (inner == null) {
 inner = context.valueSerde();
   }
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
##
@@ -54,10 +55,14 @@ public CombinedKeySchema(final Supplier 
foreignKeySerdeTopicSupplier,
 public void init(final ProcessorContext context) {
 primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
 foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
-primaryKeySerializer = primaryKeySerializer == null ? (Serializer) 
context.keySerde().serializer() : primaryKeySerializer;
-primaryKeyDeserializer = primaryKeyDeserializer == null ? 
(Deserializer) context.keySerde().deserializer() : primaryKeyDeserializer;
-foreignKeySerializer = foreignKeySerializer == null ? (Serializer) 
context.keySerde().serializer()

[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked to load snapshot

2021-06-29 Thread GitBox


zhaohaidao commented on a change in pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#discussion_r661038035



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -157,13 +160,18 @@ public synchronized void 
handleSnapshot(SnapshotReader reader) {
 public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
 if (newLeader.isLeader(nodeId)) {
 log.debug("Counter uncommitted value initialized to {} after 
claiming leadership in epoch {}",
-committed, newLeader);
+committed, newLeader);
 uncommitted = committed;
 claimedEpoch = OptionalInt.of(newLeader.epoch());
 } else {
 log.debug("Counter uncommitted value reset after resigning 
leadership");
 uncommitted = -1;
 claimedEpoch = OptionalInt.empty();
 }
+handleSnapshotCalls = 0;
+}
+
+public int getHandleSnapshotCalls() {

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked to load snapshot

2021-06-29 Thread GitBox


zhaohaidao commented on a change in pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#discussion_r661037975



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -1014,6 +1015,26 @@ public void verify() {
 }
 }
 
+private static class LeaderNeverLoadSnapshot implements Invariant {
+final Cluster cluster;
+
+private LeaderNeverLoadSnapshot(Cluster cluster) {
+this.cluster = cluster;
+}
+
+@Override
+public void verify() {
+for (RaftNode raftNode : cluster.running()) {
+if (raftNode.counter.isWritable()) {
+assertTrue(raftNode.counter.getHandleSnapshotCalls() == 0);

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r661028655



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) {
 statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, 
workerId, generation()));
 }
 
+public void onRestart(String connector) {

Review comment:
   @kkonstantine I moved it to the Lisetner interface in 
https://github.com/apache/kafka/pull/10822/commits/7f8a588aaba9f1f80889e0429b2b9f495b1c666b.
  Could you please check to see if it looks good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

2021-06-29 Thread GitBox


mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661027703



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
 @SuppressWarnings("unchecked")
 private void emitNonJoinedOuterRecords(final 
WindowStore, LeftOrRightValue> store) {
+if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - 
joinBeforeMs - joinGraceMs) {

Review comment:
   That a good point, and I was concerned about it, too. Using `beforeMs` 
might increase the latency to much, especially for large windows.
   
   I might actually be best, to just to processing-time periodic emits, to 
avoid calling `store.all()` too oftern.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kkonstantine commented on pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#issuecomment-870979782


   fyi, a test failure seems relevant: 
   `testCORSEnabled – org.apache.kafka.connect.runtime.rest.RestServerTest`
   I don't remember this test being flaky and it failed in both builders. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10942: MINOR: Move ZkMetadataCache into its own file.

2021-06-29 Thread GitBox


cmccabe merged pull request #10942:
URL: https://github.com/apache/kafka/pull/10942


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10942: MINOR: Move ZkMetadataCache into its own file.

2021-06-29 Thread GitBox


cmccabe opened a new pull request #10942:
URL: https://github.com/apache/kafka/pull/10942


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


cmccabe merged pull request #10887:
URL: https://github.com/apache/kafka/pull/10887


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

2021-06-29 Thread GitBox


ryannedolan commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r661007855



##
File path: docs/upgrade.html
##
@@ -80,7 +80,13 @@ Notable changes in 3
 understood by brokers or version 2.5 or higher, so you must upgrade 
your kafka cluster to get the stronger semantics. Otherwise, you can just pass
 in new ConsumerGroupMetadata(consumerGroupId) to work 
with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg";>KIP-732 for more 
details.
 
-
+ The Connect-based MirrorMaker (MM2) includes changes to support 
IdentityReplicationPolicy, enabling replication without renaming 
topics.

Review comment:
   @mimaison @mdedetrich wdyt?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12660) Do not update offset commit sensor after append failure

2021-06-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371669#comment-17371669
 ] 

Kirk True commented on KAFKA-12660:
---

[~dengziming] - can I take a look at this? I have some spare cycles to 
investigate and hopefully fix.

> Do not update offset commit sensor after append failure
> ---
>
> Key: KAFKA-12660
> URL: https://issues.apache.org/jira/browse/KAFKA-12660
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>
> In the append callback after writing an offset to the log in 
> `GroupMetadataManager`, It seems wrong to update the offset commit sensor 
> prior to checking for errors: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L394.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10941: KAFKA-10847: Remove internal config for enabling the fix

2021-06-29 Thread GitBox


guozhangwang commented on pull request #10941:
URL: https://github.com/apache/kafka/pull/10941#issuecomment-870917133


   ping @mjsax @spena .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #10941: KAFKA-10847: Remove internal config for enabling the fix

2021-06-29 Thread GitBox


guozhangwang opened a new pull request #10941:
URL: https://github.com/apache/kafka/pull/10941


   Also update the upgrade guide indicating about the grace period KIP and its 
indication on the fix with throughput impact.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-29 Thread GitBox


izzyacademy commented on pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#issuecomment-870911243


   @showuon @ableegoldman @mjsax @cadonna 
   
   I have updated the PR with the requested changes. Please take a look when 
you have a moment. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-29 Thread GitBox


mimaison commented on pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#issuecomment-870910353


   @dajac Thanks, I opened JIRAs for all follow up work items we identified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-29 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r660956483



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -250,6 +252,13 @@ public void onFailure(
 .filter(future.lookupKeys()::contains)
 .collect(Collectors.toSet());
 retryLookup(keysToUnmap);
+
+} else if (t instanceof NoBatchedFindCoordinatorsException) {

Review comment:
   I opened https://issues.apache.org/jira/browse/KAFKA-13013




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13013) Avoid specific exception handling in AdminApiDriver

2021-06-29 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-13013:
--

 Summary: Avoid specific exception handling in AdminApiDriver
 Key: KAFKA-13013
 URL: https://issues.apache.org/jira/browse/KAFKA-13013
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Reporter: Mickael Maison


To enable downconverting requests when brokers don't support FindCoordinator 
v4, we added some logic in AdminApiDriver to handle 
NoBatchedFindCoordinatorsException. 

Ideally AdminApiDriver should not be aware of specific exceptions like this and 
instead delegate the handling to the corresponding strategy. 

Another option is to not rely on an Exception to identify the request version 
to use and use ApiVersions when creating the initial request. A good solution 
would work nicely with all types of clients (consumer, producer, admin).

More details in:
- https://github.com/apache/kafka/pull/10743#discussion_r659602262
- https://github.com/apache/kafka/pull/10743#discussion_r649872433



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-29 Thread GitBox


izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660953683



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -83,6 +83,14 @@ protected JoinWindows(final JoinWindows joinWindows) {
 afterMs = joinWindows.afterMs;
 graceMs = joinWindows.graceMs;
 enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+
+if (beforeMs + afterMs < 0) {
+throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");

Review comment:
   > Could we make this Constructor to call another overloaded constructor, 
to avoid duplicated codes? i.e.
   > 
   > ```java
   > protected JoinWindows(final JoinWindows joinWindows) {
   > this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, 
joinWindows.enableSpuriousResultFix);
   > }
   > ```
   
   This is done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-29 Thread GitBox


izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660953456



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
 if (beforeMs + afterMs < 0) {
 throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
 }
+
+if (graceMs < 0) {
+throw new IllegalArgumentException("Grace period must not be 
negative.");
+}
+
 this.afterMs = afterMs;
 this.beforeMs = beforeMs;
 this.graceMs = graceMs;
 this.enableSpuriousResultFix = enableSpuriousResultFix;
 }
 
+/**
+ * Reject out-of-order events that are delayed more than {@code 
afterWindowEnd}
+ * after the end of its window.
+ * 
+ * Delay is defined as (stream_time - record_timestamp).
+ *
+ * @param timeDifference join window interval
+ * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+ * @return A new JoinWindows object with the specified window definition 
and grace period
+ */
 public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
 return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
 }
 
+/**
+ * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
+ * which means that out of order records arriving after the window end 
will be dropped.

Review comment:
   Upon review, I have updated the javadocs but I dont think they can be 
identical in wording.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660953271



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) {
 statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, 
workerId, generation()));
 }
 
+public void onRestart(String connector) {

Review comment:
   @kkonstantine you are right. Do you mind if we do this work in a 
follow-up PR?  The reason I ask is because if we add it to the listener then it 
becomes part of the interface and this would require me to retrofit the 
listener event into the old restartTask and restartConnector API for backward 
compatibility reasons and it can be big change to this already big PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-29 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r660952842



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler> {
+
+private final CoordinatorKey groupId;
+private final Map offsets;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public AlterConsumerGroupOffsetsHandler(
+String groupId,
+Map offsets,
+LogContext logContext
+) {
+this.groupId = CoordinatorKey.byGroupId(groupId);
+this.offsets = offsets;
+this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class);
+this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+}
+
+@Override
+public String apiName() {
+return "offsetCommit";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return lookupStrategy;
+}
+
+public static AdminApiFuture.SimpleAdminApiFuture> newFuture(
+String groupId
+) {
+return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+}
+
+@Override
+public OffsetCommitRequest.Builder buildRequest(int brokerId, 
Set keys) {
+List topics = new ArrayList<>();
+Map> offsetData = new 
HashMap<>();
+for (Map.Entry entry : 
offsets.entrySet()) {
+String topic = entry.getKey().topic();
+OffsetAndMetadata oam = entry.getValue();
+offsetData.compute(topic, (key, value) -> {
+if (value == null) {
+value = new ArrayList<>();
+}
+OffsetCommitRequestPartition partition = new 
OffsetCommitRequestPartition()
+.setCommittedOffset(oam.offset())
+.setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
+.setCommittedMetadata(oam.metadata())
+.setPartitionIndex(entry.getKey().partition());
+value.add(partition);
+return value;
+});
+}
+for (Map.Entry> entry : 
offsetData.entrySet()) {
+OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
+.setName(entry.getKey())
+.setPartitions(entry.getValue());
+topics.add(topic);
+}
+OffsetCommitRequestData data = new OffsetCommitRequestData()
+.setGroupId(groupId.idValue)
+.setTopics(topics);
+return new OffsetCommitRequest.Builder(data);
+}
+
+@Override
+public ApiResult> 
handleResponse(Node broker, Set groupIds,
+AbstractResponse abstractResponse) {
+
+final OffsetCommitResponse response = (Offset

[jira] [Created] (KAFKA-13012) Rationalize error handling of AdminClient calls

2021-06-29 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-13012:
--

 Summary: Rationalize error handling of AdminClient calls
 Key: KAFKA-13012
 URL: https://issues.apache.org/jira/browse/KAFKA-13012
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Reporter: Mickael Maison


While working on KIP-699, we noticed many AdminClient calls perform slightly 
different error handling. 

For example, all fulfilment calls following a coordinator lookup should handle 
coordinator/group errors the same way. Currently each Handler has its own 
handleError() method with subtle differences.

More details in https://github.com/apache/kafka/pull/10743#discussion_r653151839



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-29 Thread GitBox


izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660952604



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##
@@ -108,13 +147,12 @@ public static SessionWindows with(final Duration 
inactivityGap) {
  * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
  * @return this updated builder
  * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, 
Duration)} instead
  */
+@Deprecated
 public SessionWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
 final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
 final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-if (afterWindowEndMs < 0) {
-throw new IllegalArgumentException("Grace period must not be 
negative.");
-}

Review comment:
   Thanks for this feedback @showuon I have updated the code to share/reuse 
constructors. I think @ableegoldman will like that as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

2021-06-29 Thread GitBox


izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660948497



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
##
@@ -40,7 +40,11 @@
 
 // By default grace period is 24 hours for all windows,
 // in other words we allow out-of-order data for up to a day
-protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+// This behavior is now deprecated
+protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 
60 * 1000L;

Review comment:
   This is not a publicly available constant. It is only used internally by 
the implementers so I think it is OK. The reasoning for deprecating the 
constant is explained thoroughly in the KIP so I think they will understand. I 
can add more details to the Java comments to clarify. But thanks @showuon  for 
bring it up in the review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

2021-06-29 Thread GitBox


mimaison merged pull request #10221:
URL: https://github.com/apache/kafka/pull/10221


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660935556



##
File path: core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
##
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.admin.BrokerMetadata
+
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+import kafka.cluster.{Broker, EndPoint}
+import kafka.api._
+import kafka.controller.StateChangeLogger
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+import kafka.utils.Implicits._
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+
+/**
+ *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
+ *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
+ */
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660928423



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1721,8 +1720,10 @@ class ReplicaManager(val config: KafkaConfig,
   } else {
 // we do not need to check if the leader exists again since this has 
been done at the beginning of this process
 val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToMakeFollower.map { partition =>
-  val leader = metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get
-.brokerEndPoint(config.interBrokerListenerName)
+  val leaderNode = metadataCache.getAliveBrokerNode(
+partition.leaderReplicaIdOpt.getOrElse(-1), 
config.interBrokerListenerName.value()).

Review comment:
   Hmm... `getAliveBrokerNode` returns `None` or `Some`, never `noNode`. 
Anyway, I don't feel strongly about this, so I just used the version you 
suggested. I suppose it's one less place where we use -1 as a sentinel value 
(although honestly, there are enough of these places that -1 will never be a 
valid node value in Kafka, I think.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660928423



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1721,8 +1720,10 @@ class ReplicaManager(val config: KafkaConfig,
   } else {
 // we do not need to check if the leader exists again since this has 
been done at the beginning of this process
 val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToMakeFollower.map { partition =>
-  val leader = metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get
-.brokerEndPoint(config.interBrokerListenerName)
+  val leaderNode = metadataCache.getAliveBrokerNode(
+partition.leaderReplicaIdOpt.getOrElse(-1), 
config.interBrokerListenerName.value()).

Review comment:
   `getAliveBrokerNode` returns `None` when a broker ID that is not alive 
is passed. The conversion to `NoNode` happens in `ReplicaManager`, not in the 
metadata cache. I guess I don't feel strongly about it, though, if you want a 
different version of this...?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660925631



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]

Review comment:
   Yes, let's just use `ListenerName`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660925249



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,132 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlan(request);

Review comment:
   @kkonstantine after your latest explanation it makes sense to rename it 
to keep it simple. Thanks for guiding me. I have renamed it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660911200



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]
 
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
-  def numPartitions(topic: String): Option[Int]
+  /**
+   * Return the number of partitions in the given topic, or 0 if the given 
topic does not exist.
+   */
+  def numPartitions(topic: String): Int

Review comment:
   OK, I will just convert it back to an `Option[Int]`, then.
   
   Edit: that is more consistent with some of the other functions that return 
`Option` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12996) OffsetOutOfRange not handled correctly for diverging epochs when fetch offset less than leader start offset

2021-06-29 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-12996:
---
Fix Version/s: 2.7.2
Affects Version/s: 2.7.1

> OffsetOutOfRange not handled correctly for diverging epochs when fetch offset 
> less than leader start offset
> ---
>
> Key: KAFKA-12996
> URL: https://issues.apache.org/jira/browse/KAFKA-12996
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> {color:#24292e}If fetchOffset < startOffset, we currently throw 
> OffsetOutOfRangeException when attempting to read from the log in the regular 
> case. But for diverging epochs, we return Errors.NONE with the new leader 
> start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException 
> when processing responses with Errors.NONE if the leader's offsets in the 
> response are out of range and this moves the partition to failed state. We 
> should add a check for this case when processing fetch requests and ensure 
> OffsetOutOfRangeException is thrown regardless of epochs.{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12996) OffsetOutOfRange not handled correctly for diverging epochs when fetch offset less than leader start offset

2021-06-29 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-12996.

  Reviewer: Guozhang Wang
Resolution: Fixed

> OffsetOutOfRange not handled correctly for diverging epochs when fetch offset 
> less than leader start offset
> ---
>
> Key: KAFKA-12996
> URL: https://issues.apache.org/jira/browse/KAFKA-12996
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> {color:#24292e}If fetchOffset < startOffset, we currently throw 
> OffsetOutOfRangeException when attempting to read from the log in the regular 
> case. But for diverging epochs, we return Errors.NONE with the new leader 
> start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException 
> when processing responses with Errors.NONE if the leader's offsets in the 
> response are out of range and this moves the partition to failed state. We 
> should add a check for this case when processing fetch requests and ensure 
> OffsetOutOfRangeException is thrown regardless of epochs.{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


cmccabe commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660911200



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]
 
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
-  def numPartitions(topic: String): Option[Int]
+  /**
+   * Return the number of partitions in the given topic, or 0 if the given 
topic does not exist.
+   */
+  def numPartitions(topic: String): Int

Review comment:
   OK, I will just convert it back to an `Option[Int]`, then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


mumrah commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660902180



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]
 
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
-  def numPartitions(topic: String): Option[Int]
+  /**
+   * Return the number of partitions in the given topic, or 0 if the given 
topic does not exist.
+   */
+  def numPartitions(topic: String): Int

Review comment:
   I agree that the Option rather than a special value is better here. It's 
not a big deal either way for the current code since there aren't many usages, 
but in the future a new caller will have to know to check for `0`. Seems like 
it could be easy to introduce bugs.
   
   A longer term solution (beyond the scope of this PR) could be to have 
individual container objects for the topic-level metadata. This way we could 
easily encapsulate the "does this topic even exist" with an Option on the 
metadata cache. E.g., `metadataCache.topic("foo").map(_.numPartitions)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13011) Update deleteTopics Admin API

2021-06-29 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-13011:
--

Assignee: Justine Olshan

> Update deleteTopics Admin API
> -
>
> Key: KAFKA-13011
> URL: https://issues.apache.org/jira/browse/KAFKA-13011
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Implement the new deleteTopics apis as described in the KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10764) Add support for returning topic IDs on create, supplying topic IDs for delete

2021-06-29 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-10764.

Resolution: Done

Opened a new ticket for the change: 
https://issues.apache.org/jira/browse/KAFKA-13011

> Add support for returning topic IDs on create, supplying topic IDs for delete
> -
>
> Key: KAFKA-10764
> URL: https://issues.apache.org/jira/browse/KAFKA-10764
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.0.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Update Admin Client methods and update protocols to support topic IDs
> UPDATED: we just need to figure out the right API and re-implement 
> AdminClient deleteTopics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13011) Update deleteTopics Admin API

2021-06-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-13011:
--

 Summary: Update deleteTopics Admin API
 Key: KAFKA-13011
 URL: https://issues.apache.org/jira/browse/KAFKA-13011
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


Implement the new deleteTopics apis as described in the KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10764) Add support for returning topic IDs on create, supplying topic IDs for delete

2021-06-29 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371584#comment-17371584
 ] 

Justine Olshan commented on KAFKA-10764:


closed as I'm moving the admin client changes to this ticket: 
https://issues.apache.org/jira/browse/KAFKA-13011

> Add support for returning topic IDs on create, supplying topic IDs for delete
> -
>
> Key: KAFKA-10764
> URL: https://issues.apache.org/jira/browse/KAFKA-10764
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.0.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Update Admin Client methods and update protocols to support topic IDs
> UPDATED: we just need to figure out the right API and re-implement 
> AdminClient deleteTopics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-29 Thread GitBox


hachikuji commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r660883703



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]

Review comment:
   Would it make sense to use `ListenerName` in these APIs? As far as I can 
tell, all of the usages are just using `ListenerName.value`. Also, in 
`ZkMetadataCache`, we end up constructing a new `ListenerName` from the string.

##
File path: core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
##
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.admin.BrokerMetadata
+
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+import kafka.cluster.{Broker, EndPoint}
+import kafka.api._
+import kafka.controller.StateChangeLogger
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+import kafka.utils.Implicits._
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+
+/**
+ *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
+ *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
+ */
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   I'm wondering if we could leave this class in its current place just for 
this PR. It is a little hard to see the diffs. In a trivial follow-up, we can 
factor it out.

##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]
 
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
-  def numPartitions(topic: String): Option[Int]
+  /**
+   * Return the number of partitions in the given topic, or 0 if the given 
topic does not exist.
+   */
+  def numPartitions(topic: String): Int

Review comment:
   An alternative might be to raise an exception if the topic does not 
exist? I agree with Jose that we are likely to have bugs resulting from the 
caller forgetting to handle the case of 0.

##
File path: core/src/ma

[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-29 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660880726



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
 }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-val (deleted, forgottenSnapshots) = snapshots synchronized {
-  latestSnapshotId().asScala match {
-case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-  startOffset < logStartSnapshotId.offset &&
-  logStartSnapshotId.offset <= snapshotId.offset &&
-  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * This is not the latest snapshot (i.e., another snapshot proceeds this 
one)
+   * The offset of the next snapshot is greater than the log start 
offset
+   * The log can be advanced to the offset of the next snapshot
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: 
OffsetAndEpoch): Boolean = {
+if (snapshots.contains(snapshotId) &&
+snapshots.contains(nextSnapshotId) &&
+startOffset < nextSnapshotId.offset &&
+snapshotId.offset < nextSnapshotId.offset &&
+log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+  log.deleteOldSegments()
+  val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+  snapshots.remove(snapshotId) match {
+case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+  }
+  removeSnapshots(forgotten)
+  true
+} else {
+  false
+}
+  }
 
-  // Delete all segments that have a "last offset" less than the log 
start offset
-  log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+snapshots.keys.toSeq.flatMap {
+  snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+}
+  }
 
-  // Forget snapshots less than the log start offset
-  (true, forgetSnapshotsBefore(logStartSnapshotId))
-case _ =>
-  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+readSnapshot(snapshotId).asScala.flatMap { reader =>
+  val it = reader.records().batchIterator()
+  if (it.hasNext) {
+Some(it.next.maxTimestamp())
+  } else {
+None
   }
 }
+  }
 
-removeSnapshots(forgottenSnapshots)
-deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * Find oldest snapshot and delete it
+   * Advance log start offset to end of next oldest snapshot
+   * Delete log segments which wholly precede the new log start offset
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+snapshots synchronized {
+  cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+}
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we 
should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments 
we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => 
Boolean): Boolean = {
+if (snapshots.size < 2)
+  return false;
+
+var didClean = false
+snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+  case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, 
nextSnapshot)) {
+  didClean = true
+  true
+} else {
+  false
+}
+  case _ => false // Shouldn't get here with sliding(2)
+}
+didClean
+  }

Review comment:
   @jsancio how does this look? The `sliding(2)` will c

[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-29 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660879928



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -358,6 +485,7 @@ final class KafkaMetadataLog private (
 expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
   ): Unit = {
 expiredSnapshots.foreach { case (snapshotId, _) =>
+  info(s"Marking snapshot $snapshotId for deletion")

Review comment:
   I went ahead and added a logIdent to this class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kkonstantine commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660870565



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,132 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlan(request);

Review comment:
   We might be too precise here. As we discussed elsewhere, baking the 
meaning of the type in every variable name might be too verbose. 
   It's a plan, and the fact that is optional means that there might be one or 
it might not. That's how I'd read it. 
   The other use of `maybePlan` below is harder to avoid. Feel free to keep it 
consistent here with what you have below. 
   Again, my point is not to bake type meaning in the variable names. I feel 
this keeps things simpler. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12992) Make kraft configuration properties public

2021-06-29 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-12992:
---
Labels: kip-500  (was: )

> Make kraft configuration properties public
> --
>
> Key: KAFKA-12992
> URL: https://issues.apache.org/jira/browse/KAFKA-12992
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Jose Armando Garcia Sancio
>Assignee: HaiyuanZhao
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> All of the Kraft configurations should be made public:
> {code:java}
> /*
>  * KRaft mode configs. Note that these configs are defined as 
> internal. We will make them public in the 3.0.0 release.
>  */
> .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), 
> ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
> .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, 
> NodeIdDoc)
> .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, 
> Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, 
> InitialBrokerRegistrationTimeoutMsDoc)
> .defineInternal(BrokerHeartbeatIntervalMsProp, INT, 
> Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, 
> BrokerHeartbeatIntervalMsDoc)
> .defineInternal(BrokerSessionTimeoutMsProp, INT, 
> Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
> .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, 
> MetadataLogDirDoc)
> .defineInternal(ControllerListenerNamesProp, STRING, null, null, 
> HIGH, ControllerListenerNamesDoc)
> .defineInternal(SaslMechanismControllerProtocolProp, STRING, 
> SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, 
> SaslMechanismControllerProtocolDoc)
>  {code}
>  
> https://github.com/apache/kafka/blob/2beaf9a720330615bc5474ec079f8b4b105eff91/core/src/main/scala/kafka/server/KafkaConfig.scala#L1043-L1053



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kkonstantine commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660868474



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 
 }
 
+@SuppressWarnings("unchecked")
+RestartRequest recordToRestartRequest(ConsumerRecord 
record, SchemaAndValue value) {
+String connectorName = record.key().substring(RESTART_PREFIX.length());
+if (value.value() == null) {
+log.error("Ignoring restart request because it is unexpectedly 
null");
+return null;
+}
+if (!(value.value() instanceof Map)) {
+log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+return null;
+}
+
+Map valueAsMap = (Map) value.value();
+
+Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
+if (failed == null) {
+log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT);

Review comment:
   Didn't notice. Thanks. Makes sense then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kkonstantine commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660867874



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) {
 statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, 
workerId, generation()));
 }
 
+public void onRestart(String connector) {

Review comment:
   Did you compare it with other methods? My understanding is that 
`onDelete` is the same in that respect and exists on the `Listener` as well. 
But I might have skimmed too quickly through the code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660866556



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -208,6 +209,21 @@ public static String COMMIT_TASKS_KEY(String 
connectorName) {
 .field("creation-timestamp", Schema.INT64_SCHEMA)
 .build();
 
+public static final String RESTART_PREFIX = "restart-connector-";
+
+public static String RESTART_KEY(String connectorName) {
+return RESTART_PREFIX + connectorName;
+}
+
+public static final Boolean ONLY_FAILED_DEFAULT = Boolean.FALSE;
+public static final Boolean INCLUDE_TASKS_DEFAULT = Boolean.FALSE;

Review comment:
   fixed as per your suggestions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#issuecomment-870820269


   @kkonstantine Thanks a lot for taking the time and posting the detailed 
review comments. I was able to resolve all of them except for a few where I 
left some additional comments.   
   
   Please let me know how you want to proceed with them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660864346



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) {
 statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, 
workerId, generation()));
 }
 
+public void onRestart(String connector) {

Review comment:
   I added the missing documentation.
   
   I didn't add the onRestart method to  Listener because it didn't felt it 
should be a part of the task lifecycle controlled by the worker. Earlier the 
method was called recordRestart but based on a review comment, it was renamed 
it to onRestart, Do you think we should rename it back to recordRestart. The 
original intent of this method was to just record the state change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

2021-06-29 Thread GitBox


guozhangwang commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r660844998



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
 @SuppressWarnings("unchecked")
 private void emitNonJoinedOuterRecords(final 
WindowStore, LeftOrRightValue> store) {
+if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - 
joinBeforeMs - joinGraceMs) {

Review comment:
   If we want to treat it as a performance/emit latency trade-off, then 
maybe better not piggy-back on `beforeMs` but certain value that's irrelevant 
to the window size?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-29 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660839303



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -16,32 +16,37 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, 
ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the 
snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to 
store any opened snapshot reader.
-  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
+  val snapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]],

Review comment:
   Fair enough. I need a way to get the number of snapshots from the test 
code. Maybe just a package private accessor like 
   
   ```scala
   private[raft] def snapshotCount(): Int = { ... } 
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-29 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660837401



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -358,6 +485,7 @@ final class KafkaMetadataLog private (
 expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
   ): Unit = {
 expiredSnapshots.foreach { case (snapshotId, _) =>
+  info(s"Marking snapshot $snapshotId for deletion")

Review comment:
   I assumed the class included some common log info like that, but looks 
like it doesn't. I'll add it here and open a separate PR to add some basic 
fields to the logIdent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-29 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660836532



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
 }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-val (deleted, forgottenSnapshots) = snapshots synchronized {
-  latestSnapshotId().asScala match {
-case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-  startOffset < logStartSnapshotId.offset &&
-  logStartSnapshotId.offset <= snapshotId.offset &&
-  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * This is not the latest snapshot (i.e., another snapshot proceeds this 
one)
+   * The offset of the next snapshot is greater than the log start 
offset
+   * The log can be advanced to the offset of the next snapshot
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, 
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+nextSnapshotIdOpt.exists { nextSnapshotId =>
+  if (snapshots.contains(snapshotId) &&
+  snapshots.contains(nextSnapshotId) &&
+  startOffset < nextSnapshotId.offset &&
+  snapshotId.offset < nextSnapshotId.offset &&
+  log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+log.deleteOldSegments()
+val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+snapshots.remove(snapshotId) match {
+  case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+  case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+}
+removeSnapshots(forgotten)
+true
+  } else {
+false
+  }
+}
+  }
 
-  // Delete all segments that have a "last offset" less than the log 
start offset
-  log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+snapshots.keys.toSeq.flatMap {
+  snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+}
+  }
 
-  // Forget snapshots less than the log start offset
-  (true, forgetSnapshotsBefore(logStartSnapshotId))
-case _ =>
-  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+readSnapshot(snapshotId).asScala.flatMap { reader =>
+  val it = reader.records().batchIterator()
+  if (it.hasNext) {
+Some(it.next.maxTimestamp())
+  } else {
+None
   }
 }
+  }
 
-removeSnapshots(forgottenSnapshots)
-deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * Find oldest snapshot and delete it
+   * Advance log start offset to end of next oldest snapshot
+   * Delete log segments which wholly precede the new log start offset
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+snapshots synchronized {
+  cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+}
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we 
should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments 
we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, 
Option[OffsetAndEpoch]) => Boolean): Boolean = {
+val snapshotIterator = snapshots.keys.iterator
+var snapshotOpt = Log.nextOption(snapshotIterator)
+var didClean = false
+while (snapshotOpt.isDefined) {
+  val snapshot = snapshotOpt.get
+  val nextOpt = Log.nextOption(snapshotIterator)
+  if (predicate(snapshot, nextOpt)) {
+if (deleteSnapshot(snapshot, nextOpt)) {
+  didClean = true
+  snapshotOpt = nextOpt
+} else {
+  

[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660836387



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 
 }
 
+@SuppressWarnings("unchecked")
+RestartRequest recordToRestartRequest(ConsumerRecord 
record, SchemaAndValue value) {
+String connectorName = record.key().substring(RESTART_PREFIX.length());
+if (value.value() == null) {
+log.error("Ignoring restart request because it is unexpectedly 
null");
+return null;
+}
+if (!(value.value() instanceof Map)) {
+log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+return null;
+}
+
+Map valueAsMap = (Map) value.value();
+
+Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
+if (failed == null) {
+log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT);

Review comment:
   we are logging an error if we are rejecting the request due to an 
invalid type, but in this particular case as we are defaulting the missing 
fields, that's why the warn.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-29 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660836052



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
 }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-val (deleted, forgottenSnapshots) = snapshots synchronized {
-  latestSnapshotId().asScala match {
-case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-  startOffset < logStartSnapshotId.offset &&
-  logStartSnapshotId.offset <= snapshotId.offset &&
-  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * This is not the latest snapshot (i.e., another snapshot proceeds this 
one)
+   * The offset of the next snapshot is greater than the log start 
offset
+   * The log can be advanced to the offset of the next snapshot
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, 
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+nextSnapshotIdOpt.exists { nextSnapshotId =>
+  if (snapshots.contains(snapshotId) &&
+  snapshots.contains(nextSnapshotId) &&
+  startOffset < nextSnapshotId.offset &&
+  snapshotId.offset < nextSnapshotId.offset &&
+  log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+log.deleteOldSegments()
+val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+snapshots.remove(snapshotId) match {
+  case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+  case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+}
+removeSnapshots(forgotten)
+true
+  } else {
+false
+  }
+}
+  }
 
-  // Delete all segments that have a "last offset" less than the log 
start offset
-  log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+snapshots.keys.toSeq.flatMap {
+  snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+}
+  }
 
-  // Forget snapshots less than the log start offset
-  (true, forgetSnapshotsBefore(logStartSnapshotId))
-case _ =>
-  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+readSnapshot(snapshotId).asScala.flatMap { reader =>
+  val it = reader.records().batchIterator()
+  if (it.hasNext) {
+Some(it.next.maxTimestamp())
+  } else {
+None
   }
 }
+  }
 
-removeSnapshots(forgottenSnapshots)
-deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * Find oldest snapshot and delete it
+   * Advance log start offset to end of next oldest snapshot
+   * Delete log segments which wholly precede the new log start offset
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+snapshots synchronized {
+  cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+}
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we 
should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments 
we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, 
Option[OffsetAndEpoch]) => Boolean): Boolean = {
+val snapshotIterator = snapshots.keys.iterator
+var snapshotOpt = Log.nextOption(snapshotIterator)
+var didClean = false
+while (snapshotOpt.isDefined) {
+  val snapshot = snapshotOpt.get
+  val nextOpt = Log.nextOption(snapshotIterator)
+  if (predicate(snapshot, nextOpt)) {
+if (deleteSnapshot(snapshot, nextOpt)) {

Review comment:
   That's a good point, I'll clean this up a bit




-- 
This is an 

[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660834568



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,132 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlan(request);

Review comment:
   The plan may or may not be present, that's why I was calling it maybePlan




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-29 Thread GitBox


jsancio commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660808793



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -16,32 +16,37 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, 
ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the 
snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to 
store any opened snapshot reader.
-  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
+  val snapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]],

Review comment:
   I don't think we should do this. Access to this data needs to be 
synchronized.
   
   It is also very likely that this is an implementation detail that will 
change in the near future.

##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -244,10 +246,10 @@ public void testFetchRequestOffsetLessThanLogStart() 
throws Exception {
 assertEquals(snapshotId, snapshot.snapshotId());
 snapshot.freeze();
 }
-
+context.log.deleteBeforeSnapshot(snapshotId);
 context.client.poll();
 
-assertEquals(snapshotId.offset, context.log.startOffset());
+//context.log.logStartOffset(snapshotId.offset);

Review comment:
   Commented out code.

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -316,6 +316,11 @@ public void flush() {
 lastFlushedOffset = endOffset().offset;
 }
 
+@Override
+public boolean maybeClean() {
+return false;
+}

Review comment:
   Are you planning to implement this and use this instead of 
`deleteBeforeSnapshot` below? Most of the tests in the `raft` module depend on 
`MockLog` having the same semantic as `KafkaMetadataLog`.

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
 }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-val (deleted, forgottenSnapshots) = snapshots synchronized {
-  latestSnapshotId().asScala match {
-case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-  startOffset < logStartSnapshotId.offset &&
-  logStartSnapshotId.offset <= snapshotId.offset &&
-  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * This is not the latest snapshot (i.e., another snapshot proceeds this 
one)
+   * The offset of the next snapshot is greater than the log start 
offset
+   * The log can be advanced to the offset of the next snapshot
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, 
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+nextSnapshotIdOpt.exists { nextSnapshotId =>
+  if (snapshots.contains(snapshotId) &&
+  snapshots.contains(nextSnapshotId) &&
+  startOffset < nextSnapshotId.offset &&
+  snapshotId.offset < nextSnapshotId.offset &&
+  log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+log.deleteOldSegments()
+val forgotten = mutable.Tree

[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-06-29 Thread GitBox


jolshan commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-870786796


   Hi again @dengziming. There was one more change to the API (should be the 
last one this time). It includes adding a new class that I implement in this 
PR: https://github.com/apache/kafka/pull/10892
   Please check out the KIP for more details on the changes to 
DescribeTopicsResult and the new apis for describing with topic IDs. Thanks for 
your diligence and patience with this one. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs

2021-06-29 Thread GitBox


rajinisivaram commented on pull request #10930:
URL: https://github.com/apache/kafka/pull/10930#issuecomment-870776438


   Thanks @guozhangwang ! Yes, we should cherry-pick to 2.8 and 2.7. I can do 
that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot

2021-06-29 Thread GitBox


jeffkbkim commented on pull request #10940:
URL: https://github.com/apache/kafka/pull/10940#issuecomment-870768034


   @ijuma @hachikuji thanks for the review. i'll add a benchmark, not too sure 
if there's any other way to test the performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot

2021-06-29 Thread GitBox


ijuma commented on a change in pull request #10940:
URL: https://github.com/apache/kafka/pull/10940#discussion_r660800092



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4200,6 +4200,7 @@ public ListOffsetsResult listOffsets(Map topicPartit
Map> futures) {
 
 MetadataResponse mr = context.response().orElseThrow(() -> new 
IllegalStateException("No Metadata response"));
+Cluster clusterSnapshot = mr.cluster();

Review comment:
   yeah, I was thinking the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…

2021-06-29 Thread GitBox


jsancio commented on a change in pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#discussion_r660788266



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -1014,6 +1015,26 @@ public void verify() {
 }
 }
 
+private static class LeaderNeverLoadSnapshot implements Invariant {
+final Cluster cluster;
+
+private LeaderNeverLoadSnapshot(Cluster cluster) {
+this.cluster = cluster;
+}
+
+@Override
+public void verify() {
+for (RaftNode raftNode : cluster.running()) {
+if (raftNode.counter.isWritable()) {
+assertTrue(raftNode.counter.getHandleSnapshotCalls() == 0);

Review comment:
   Let's use `assertEquals`:
   ```suggestion
   assertEquals(0, 
raftNode.counter.getHandleSnapshotCalls());
   ```

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -157,13 +160,18 @@ public synchronized void 
handleSnapshot(SnapshotReader reader) {
 public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
 if (newLeader.isLeader(nodeId)) {
 log.debug("Counter uncommitted value initialized to {} after 
claiming leadership in epoch {}",
-committed, newLeader);
+committed, newLeader);
 uncommitted = committed;
 claimedEpoch = OptionalInt.of(newLeader.epoch());
 } else {
 log.debug("Counter uncommitted value reset after resigning 
leadership");
 uncommitted = -1;
 claimedEpoch = OptionalInt.empty();
 }
+handleSnapshotCalls = 0;
+}
+
+public int getHandleSnapshotCalls() {

Review comment:
   The Kafka project tends not to prefix accessors with `get`:
   ```suggestion
   public int handleSnapshotCalls() {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot

2021-06-29 Thread GitBox


hachikuji commented on a change in pull request #10940:
URL: https://github.com/apache/kafka/pull/10940#discussion_r660787940



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4200,6 +4200,7 @@ public ListOffsetsResult listOffsets(Map topicPartit
Map> futures) {
 
 MetadataResponse mr = context.response().orElseThrow(() -> new 
IllegalStateException("No Metadata response"));
+Cluster clusterSnapshot = mr.cluster();

Review comment:
   Having names that sound like simple accessors probably makes this kind 
of performance issue more likely. Maybe we could rename `cluster()` to 
`buildCluster()` so that it is clear there is some work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

2021-06-29 Thread GitBox


cmccabe merged pull request #10899:
URL: https://github.com/apache/kafka/pull/10899


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12677) The raftCluster always send to the wrong active controller and never update

2021-06-29 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-12677:
---
Labels: kip-500  (was: )

> The raftCluster always send to the wrong active controller and never update
> ---
>
> Key: KAFKA-12677
> URL: https://issues.apache.org/jira/browse/KAFKA-12677
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> We introduce KIP-500 to introduce a Self-Managed Metadata Quorum. We should 
> always have 1 active controller, and all the RPC will send to the active 
> controller. But there's chances that the active controller already changed, 
> but the RPC still send to the old one.
> In the attachment log, we can see:
> {code:java}
> [Controller 3002] Becoming active at controller epoch 1. 
> ...
> [Controller 3000] Becoming active at controller epoch 2. 
> {code}
> So, the latest active controller should be 3000. But the create topic RPC are 
> all sending to controller 3002:
> {code:java}
> "errorMessage":"The active controller appears to be node 3000"
> {code}
> This bug causes the RaftClusterTests flaky.
>  
> Debug log while running testCreateClusterAndCreateListDeleteTopic test: 
> https://drive.google.com/file/d/1WVUgy1Erjx8mHyofiP9MVvQGb0LcDYt3/view?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660782579



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 
 }
 
+@SuppressWarnings("unchecked")
+RestartRequest recordToRestartRequest(ConsumerRecord 
record, SchemaAndValue value) {
+String connectorName = record.key().substring(RESTART_PREFIX.length());
+if (value.value() == null) {
+log.error("Ignoring restart request because it is unexpectedly 
null");
+return null;
+}
+if (!(value.value() instanceof Map)) {
+log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+return null;
+}
+
+Map valueAsMap = (Map) value.value();
+
+Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
+if (failed == null) {
+log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT);
+failed = ONLY_FAILED_DEFAULT;
+}
+if (!(failed instanceof Boolean)) {
+log.warn("Invalid data for restart request '{}' field should be a 
Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, 
failed.getClass(), ONLY_FAILED_DEFAULT);
+failed = ONLY_FAILED_DEFAULT;
+}
+boolean onlyFailed = (Boolean) failed;

Review comment:
   I Agree and I fixed it like you suggested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-29 Thread GitBox


kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660781818



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map 
connectorProps, Callback
 });
 }
 
+/**
+ * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+ * and the current status of the connector and task instances.
+ *
+ * @param request the restart request; may not be null
+ * @return the restart plan, or empty if this worker has no status for the 
connector named in the request and therefore the
+ * connector cannot be restarted
+ */
+public Optional buildRestartPlan(RestartRequest request) {
+String connectorName = request.connectorName();
+ConnectorStatus connectorStatus = 
statusBackingStore.get(connectorName);
+if (connectorStatus == null) {
+return Optional.empty();
+}
+
+// If requested, mark the connector as restarting
+AbstractStatus.State connectorState;
+if (request.shouldRestartConnector(connectorStatus)) {
+connectorState = AbstractStatus.State.RESTARTING;
+} else {
+connectorState = connectorStatus.state();
+}

Review comment:
   fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12992) Make kraft configuration properties public

2021-06-29 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao reassigned KAFKA-12992:
---

Assignee: HaiyuanZhao

> Make kraft configuration properties public
> --
>
> Key: KAFKA-12992
> URL: https://issues.apache.org/jira/browse/KAFKA-12992
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Jose Armando Garcia Sancio
>Assignee: HaiyuanZhao
>Priority: Blocker
> Fix For: 3.0.0
>
>
> All of the Kraft configurations should be made public:
> {code:java}
> /*
>  * KRaft mode configs. Note that these configs are defined as 
> internal. We will make them public in the 3.0.0 release.
>  */
> .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), 
> ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
> .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, 
> NodeIdDoc)
> .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, 
> Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, 
> InitialBrokerRegistrationTimeoutMsDoc)
> .defineInternal(BrokerHeartbeatIntervalMsProp, INT, 
> Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, 
> BrokerHeartbeatIntervalMsDoc)
> .defineInternal(BrokerSessionTimeoutMsProp, INT, 
> Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
> .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, 
> MetadataLogDirDoc)
> .defineInternal(ControllerListenerNamesProp, STRING, null, null, 
> HIGH, ControllerListenerNamesDoc)
> .defineInternal(SaslMechanismControllerProtocolProp, STRING, 
> SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, 
> SaslMechanismControllerProtocolDoc)
>  {code}
>  
> https://github.com/apache/kafka/blob/2beaf9a720330615bc5474ec079f8b4b105eff91/core/src/main/scala/kafka/server/KafkaConfig.scala#L1043-L1053



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeffkbkim opened a new pull request #10940: KAFKA-13007: KafkaAdminClient getListOffsetsCalls reuse cluster snapshot

2021-06-29 Thread GitBox


jeffkbkim opened a new pull request #10940:
URL: https://github.com/apache/kafka/pull/10940


   in getListOffsetsCalls, we rebuild the cluster snapshot for every topic 
partition. instead, we should reuse a snapshot. this will reduce the time 
complexity from O(n^2) to O(n).
   
   for manual testing (used AK 2.8), i've passed in a map of 6K topic 
partitions to listOffsets
   
   without snapshot reuse:
   duration of building futures from metadata response: **15582** milliseconds
   total duration of listOffsets: **15743** milliseconds
   
   with reuse:
   duration of building futures from metadata response: **24** milliseconds
   total duration of listOffsets: **235** milliseconds
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…

2021-06-29 Thread GitBox


zhaohaidao commented on pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#issuecomment-870739108


   @jsancio Thanks for your advice. The comments have been addressed. Please 
continue to review  when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10811: KAFKA-12598: remove zookeeper support on configCommand except security config

2021-06-29 Thread GitBox


rondagostino commented on pull request #10811:
URL: https://github.com/apache/kafka/pull/10811#issuecomment-870738717


   Regarding the bootstrap use cases, these were enumerated in [KIP-515: Enable 
ZK client to use the new TLS supported 
authentication](https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication)
 when we added ZooKeeper TLS, and while that was a while ago, yes, it is still 
necessary to be able to bootstrap configs prior to starting Kafka.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12520) Producer state is needlessly rebuilt on startup

2021-06-29 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12520.
-
Fix Version/s: 3.0.0
 Assignee: Cong Ding  (was: Dhruvil Shah)
   Resolution: Fixed

merged the PR to trunk

> Producer state is needlessly rebuilt on startup
> ---
>
> Key: KAFKA-12520
> URL: https://issues.apache.org/jira/browse/KAFKA-12520
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Cong Ding
>Priority: Major
> Fix For: 3.0.0
>
>
> When we find a {{.swap}} file on startup, we typically want to rename and 
> replace it as {{.log}}, {{.index}}, {{.timeindex}}, etc. as a way to complete 
> any ongoing replace operations. These swap files are usually known to have 
> been flushed to disk before the replace operation begins.
> One flaw in the current logic is that when we recover these swap files on 
> startup, we end up truncating the producer state and rebuild it from scratch. 
> This is unneeded as the replace operation does not mutate the producer state 
> by itself. It is only meant to replace the {{.log}} file along with 
> corresponding indices.
> Because of this unneeded producer state rebuild operation, we have seen 
> multi-hour startup times for clusters that have large compacted topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-29 Thread GitBox


junrao merged pull request #10763:
URL: https://github.com/apache/kafka/pull/10763


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…

2021-06-29 Thread GitBox


zhaohaidao commented on a change in pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#discussion_r660772538



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -38,6 +38,9 @@
 private OptionalInt claimedEpoch = OptionalInt.empty();
 private long lastOffsetSnapshotted = -1;
 
+private int handleSnapshotCalls = 0;
+private boolean handleSnapshotCalled = false;

Review comment:
   fair enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on a change in pull request #10932: KAFKA-12958: add an invariant that notified leaders are never asked t…

2021-06-29 Thread GitBox


zhaohaidao commented on a change in pull request #10932:
URL: https://github.com/apache/kafka/pull/10932#discussion_r660772174



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -157,13 +162,27 @@ public synchronized void 
handleSnapshot(SnapshotReader reader) {
 public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
 if (newLeader.isLeader(nodeId)) {
 log.debug("Counter uncommitted value initialized to {} after 
claiming leadership in epoch {}",
-committed, newLeader);
+committed, newLeader);
 uncommitted = committed;
 claimedEpoch = OptionalInt.of(newLeader.epoch());
 } else {
 log.debug("Counter uncommitted value reset after resigning 
leadership");
 uncommitted = -1;
 claimedEpoch = OptionalInt.empty();
 }
+handleSnapshotCalled = false;
+handleSnapshotCalls = 0;
+}
+
+public boolean isLeader() {
+return this.client.leaderAndEpoch().isLeader(nodeId);

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #10866: [Test ] debug zk acl errors

2021-06-29 Thread GitBox


omkreddy closed pull request #10866:
URL: https://github.com/apache/kafka/pull/10866


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-29 Thread GitBox


junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r660769175



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -192,20 +230,10 @@ object LogLoader extends Logging {
 debug(s"${params.logIdentifier}Deleting stray temporary file 
${file.getAbsolutePath}")
 Files.deleteIfExists(file.toPath)
   } else if (filename.endsWith(CleanedFileSuffix)) {
-minCleanedFileOffset = Math.min(offsetFromFileName(filename), 
minCleanedFileOffset)
-cleanFiles += file
+minCleanedFileOffset = Math.min(offsetFromFile(file), 
minCleanedFileOffset)
+cleanedFiles += file
   } else if (filename.endsWith(SwapFileSuffix)) {
-// we crashed in the middle of a swap operation, to recover:
-// if a log, delete the index files, complete the swap operation later
-// if an index just delete the index files, they will be rebuilt
-val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
-info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from 
interrupted swap operation.")
-if (Log.isIndexFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-} else if (Log.isLogFile(baseFile)) {
-  deleteIndicesIfExist(baseFile)
-  swapFiles += file
-}
+swapFiles += file

Review comment:
   Thanks for the explanation. Make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs

2021-06-29 Thread GitBox


guozhangwang commented on pull request #10930:
URL: https://github.com/apache/kafka/pull/10930#issuecomment-870715774


   Merged to trunk. Thanks @rajinisivaram !
   
   Should we cherry-pick to older branches too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10930: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs

2021-06-29 Thread GitBox


guozhangwang merged pull request #10930:
URL: https://github.com/apache/kafka/pull/10930


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-06-29 Thread GitBox


cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r660743774



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+
+/**
+ * Represents changes to the topics in the metadata image.
+ */
+public final class TopicsDelta {
+private final TopicsImage image;
+
+/**
+ * A map from topic IDs to the topic deltas for each topic. Topics which 
have been
+ * deleted will not appear in this map.
+ */
+private final Map changedTopics = new HashMap<>();
+
+/**
+ * The IDs of topics that exist in the image but that have been deleted. 
Note that if
+ * a topic does not exist in the image, it will also not exist in this 
set. Topics
+ * that are created and then deleted within the same delta will leave no 
trace.
+ */
+private final Set deletedTopicIds = new HashSet<>();

Review comment:
   No worries. Thanks for taking a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-29 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660172772



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+// Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+val commitedOffsets = Map(
+  testTopicPartition1 -> new OffsetAndMetadata(100),
+  testTopicPartition2 -> null,
+  testTopicPartition3 -> new OffsetAndMetadata(100),
+  testTopicPartition4 -> new OffsetAndMetadata(100),
+  testTopicPartition5 -> null,
+).asJava
+
+val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+val endOffsets = Map(
+  testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+  testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+)
+val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+val consumerGroupDescription = new ConsumerGroupDescription(group,
+  true,
+  Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+  classOf[RangeAssignor].getName,
+  ConsumerGroupState.STABLE,
+  new Node(1, "localhost", 9092))
+
+def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): 
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+  topicPartitionOffsets => topicPartitionOffsets != null && 
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+}
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+when(admin.listOffsets(
+  ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+  any()
+)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.asJava))
+when(admin.listOffsets(
+  ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+  any()
+)).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.asJava))
+
+val (state, assignments) = groupService.collectGroupOffsets(group)
+val returnedOffsets = assignments.map { results =>
+  results.map { assignment =>
+new TopicPartition(assignment.topic.get, assignment.partition.get) -> 
assignment.offset
+  }.toMap
+}.getOrElse(Map.empty)
+// Results should have information for all assigned topic partition (even 
if there is not Offset's information at all, because they get fills with None)
+// Results should have information only for unassigned topic partitions if 
and only if there is information about them (including with null values)

Review comment:
   You are right, it's not relevant because all partitions have information 
now. Going to remove

##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+val testTopicPartition0 = new TopicP

  1   2   >