[jira] [Commented] (KAFKA-7870) Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read.

2021-09-21 Thread priya Vijay (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418435#comment-17418435
 ] 

priya Vijay commented on KAFKA-7870:


We are running kafka 2.8 and see similar issues. We have 3 brokers and all of 
them have errors like below sending fetch request to each other

[2021-09-22 00:00:47,274] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=0] Error sending fetch request (sessionId=9629724, epoch=60615) to 
node 2: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to 2 was disconnected before the response was 
read



2021-09-22 00:00:47,284] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=9629724, 
epoch=60615), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 2 was disconnected before the response was 
read
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)

 

[~Tyran] I see you had similar issues. how did you go about fixing it? 

> Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: 
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read.
> 
>
> Key: KAFKA-7870
> URL: https://issues.apache.org/jira/browse/KAFKA-7870
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Chakhsu Lau
>Priority: Blocker
>
> We build a kafka cluster with 5 brokers. But one of brokers suddenly stopped 
> running during the run. And it happened twice in the same broker. Here is the 
> log and is this a bug in kafka ?
> {code:java}
> [2019-01-25 12:57:14,686] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error sending fetch request (sessionId=1578860481, 
> epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was 
> disconnected before the response was read. 
> (org.apache.kafka.clients.FetchSessionHandler)
> [2019-01-25 12:57:14,687] WARN [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={api-result-bi-heatmap-8=(offset=0, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-save-12=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-2=(offset=2, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-flow-39=(offset=1883206, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-47=(offset=349437, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-track-6=(offset=1039889, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-task-17=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-aggs-19=(offset=1255056, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1578860481, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #11350: Scala3 migration

2021-09-21 Thread GitBox


jlprat commented on pull request #11350:
URL: https://github.com/apache/kafka/pull/11350#issuecomment-924613583


   One of tus failures was https://issues.apache.org/jira/browse/KAFKA-8785
   I need to check the other one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread GitBox


ccding commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-924557403


   FYI: there is a side effect of not throwing `IllegalStateException`. The 
`deleteSegment` function will not throw and we will be able to rename the 
`.swap` files to regular files during shutdown. This is a good thing.
   
   
https://github.com/apache/kafka/blob/5a6f19b2a1ff72c52ad627230ffdf464456104ee/core/src/main/scala/kafka/log/LocalLog.scala#L895-L909


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11336: DRAFT: Add currentPosition and times to ProcessorContext

2021-09-21 Thread GitBox


mjsax commented on a change in pull request #11336:
URL: https://github.com/apache/kafka/pull/11336#discussion_r713538585



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##
@@ -247,4 +251,45 @@ Cancellable schedule(final Duration interval,
  * @return the key/values matching the given prefix from the StreamsConfig 
properties.
  */
 Map appConfigsWithPrefix(final String prefix);
+
+/**
+ * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+ *
+ *  Note: this method returns the internally cached system timestamp 
from the Kafka Stream runtime.
+ * Thus, it may return a different value compared to {@code 
System.currentTimeMillis()}.
+ *
+ * @return the current system timestamp in milliseconds
+ */
+long currentSystemTimeMs();

Review comment:
   Did we miss to add those to the new context via 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext
 ???

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##
@@ -110,6 +112,11 @@ public long currentStreamTimeMs() {
 throw new UnsupportedOperationException("There is no concept of 
stream-time for a global processor.");
 }
 
+@Override
+public Map currentPositions() {
+throw new UnsupportedOperationException("currentPositions is not 
supported for global processors.");

Review comment:
   Why not? (We also support `currentSystemTimeMs()` -- we only don't 
support `currentStreamTimeMs()` because the global thread processed 
independently of the main threads, and there is no concept of stream-time for 
the global task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11347: KAFKA-13296: warn if previous assignment has duplicate partitions

2021-09-21 Thread GitBox


mjsax commented on a change in pull request #11347:
URL: https://github.com/apache/kafka/pull/11347#discussion_r713232644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -351,6 +351,17 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 // add the consumer and any info in its subscription to the client
 clientMetadata.addConsumer(consumerId, 
subscription.ownedPartitions());
+if (allOwnedPartitions.stream().anyMatch(t -> 
subscription.ownedPartitions().contains(t))) {
+log.warn("The previous assignment contains a partition more 
than once. " +
+"This might result in violation of EOS if enabled. \n" +

Review comment:
   Not sure if we need to call out EOS in particular? It's incorrect in any 
case.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -351,6 +351,17 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 // add the consumer and any info in its subscription to the client
 clientMetadata.addConsumer(consumerId, 
subscription.ownedPartitions());
+if (allOwnedPartitions.stream().anyMatch(t -> 
subscription.ownedPartitions().contains(t))) {
+log.warn("The previous assignment contains a partition more 
than once. " +
+"This might result in violation of EOS if enabled. \n" +

Review comment:
   Wondering if we should print the full old-assignment a single time if we 
detect any error instead? Would be helpful to get the `client.id -> 
old-assignment` mapping?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: fix issue : KafkaConsumer cannot jump out of the poll method, and the…

2021-09-21 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r713524099



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1069,6 +1065,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
 }
 }
 
+private void cleanUpConsumedOffsets(Map 
willCommitOffsets) {
+
+if (willCommitOffsets.isEmpty())
+return;
+
+Set validTopics = metadata.fetch().topics();
+Set toGiveUpTopicPartitions = new HashSet<>();
+
+Iterator> iterator = 
willCommitOffsets.entrySet().iterator();
+
+while (iterator.hasNext()) {
+
+Map.Entry entry = 
iterator.next();
+
+if (!validTopics.contains(entry.getKey().topic())) {
+
+toGiveUpTopicPartitions.add(entry.getKey());
+iterator.remove();
+}
+
+}
+
+if (toGiveUpTopicPartitions.size() > 0) {
+
+//Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
+//We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
+log.warn("Synchronous auto-commit of offsets {} will be 
abandoned", toGiveUpTopicPartitions);

Review comment:
   Thank you for your reply.
   Can you simulate the case of jira and debug test it locally?
   
   The `allConsumedOffsets` variable is a collection of all topicPartitions 
with a valid position in the consumer's local area.
   SourceCode as below:
   `Map allConsumedOffsets = 
subscriptions.allConsumed();`
   
   `public synchronized Map allConsumed() {
   Map allConsumed = new HashMap<>();
   assignment.forEach((topicPartition, partitionState) -> {
   if (partitionState.hasValidPosition())
   allConsumed.put(topicPartition, new 
OffsetAndMetadata(partitionState.position.offset,
   partitionState.position.offsetEpoch, ""));
   });
   return allConsumed;
   }`
   
   
   In fact, what `toGiveUpTopicPartitions` is filtered here are all 
topicPartitions that are ready to commitOffsets but may trigger 
`UnknownTopicOrPartitionException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread GitBox


ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713531154



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -499,12 +499,13 @@ class BrokerServer(
   if (clientToControllerChannelManager != null)
 CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-  if (logManager != null)
-CoreUtils.swallow(logManager.shutdown(), this)
-  // be sure to shutdown scheduler after log manager
+  // be sure to shutdown scheduler before log manager

Review comment:
   Thanks for the code review. I moved `kafkaScheduler.shutdown` upfront, 
changed the exception to an info-level log, and added comments.
   
   For the exception to log change: I looked at the code that calls `startup` 
and `shutdown`. It appears we always call startup right after creating the 
scheduler object and call shutdown when calling the parent's shutdown or close. 
I think the callers make sure that they won't call `scheduler.schedule` to a 
scheduler that has been shutdown, unless we are shutting down the broker. So it 
should be okay to change the `IllegalStateException` to an info-level log




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: fix issue : KafkaConsumer cannot jump out of the poll method, and the…

2021-09-21 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r713524099



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1069,6 +1065,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
 }
 }
 
+private void cleanUpConsumedOffsets(Map 
willCommitOffsets) {
+
+if (willCommitOffsets.isEmpty())
+return;
+
+Set validTopics = metadata.fetch().topics();
+Set toGiveUpTopicPartitions = new HashSet<>();
+
+Iterator> iterator = 
willCommitOffsets.entrySet().iterator();
+
+while (iterator.hasNext()) {
+
+Map.Entry entry = 
iterator.next();
+
+if (!validTopics.contains(entry.getKey().topic())) {
+
+toGiveUpTopicPartitions.add(entry.getKey());
+iterator.remove();
+}
+
+}
+
+if (toGiveUpTopicPartitions.size() > 0) {
+
+//Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
+//We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
+log.warn("Synchronous auto-commit of offsets {} will be 
abandoned", toGiveUpTopicPartitions);

Review comment:
   Thank you for your reply.
   Can you simulate the case of jira and debug test it locally?
   
   The `allConsumedOffsets` variable is a collection of all topicPartitions 
with a valid position in the consumer's local area.
   SourceCode as below:
   `Map allConsumedOffsets = 
subscriptions.allConsumed();`
   
   `public synchronized Map allConsumed() {
   Map allConsumed = new HashMap<>();
   assignment.forEach((topicPartition, partitionState) -> {
   if (partitionState.hasValidPosition())
   allConsumed.put(topicPartition, new 
OffsetAndMetadata(partitionState.position.offset,
   partitionState.position.offsetEpoch, ""));
   });
   return allConsumed;
   }`
   
   
   In fact, what is filtered here are all topicPartitions that are ready to 
commitOffsets but may trigger `UnknownTopicOrPartitionException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: fix issue : KafkaConsumer cannot jump out of the poll method, and the…

2021-09-21 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r713524099



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1069,6 +1065,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
 }
 }
 
+private void cleanUpConsumedOffsets(Map 
willCommitOffsets) {
+
+if (willCommitOffsets.isEmpty())
+return;
+
+Set validTopics = metadata.fetch().topics();
+Set toGiveUpTopicPartitions = new HashSet<>();
+
+Iterator> iterator = 
willCommitOffsets.entrySet().iterator();
+
+while (iterator.hasNext()) {
+
+Map.Entry entry = 
iterator.next();
+
+if (!validTopics.contains(entry.getKey().topic())) {
+
+toGiveUpTopicPartitions.add(entry.getKey());
+iterator.remove();
+}
+
+}
+
+if (toGiveUpTopicPartitions.size() > 0) {
+
+//Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
+//We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
+log.warn("Synchronous auto-commit of offsets {} will be 
abandoned", toGiveUpTopicPartitions);

Review comment:
   Thank you for your reply.
   Can you simulate the case of jira and debug test it locally?
   
   The `allConsumedOffsets` variable is a collection of all topicPartitions 
with a valid position in the consumer's local area
   sourceCode as below:
   `Map allConsumedOffsets = 
subscriptions.allConsumed();`
   
   `public synchronized Map allConsumed() {
   Map allConsumed = new HashMap<>();
   assignment.forEach((topicPartition, partitionState) -> {
   if (partitionState.hasValidPosition())
   allConsumed.put(topicPartition, new 
OffsetAndMetadata(partitionState.position.offset,
   partitionState.position.offsetEpoch, ""));
   });
   return allConsumed;
   }`
   
   
   In fact, what is filtered here are all topicPartitions that are ready to 
commitOffsets but may trigger `UnknownTopicOrPartitionException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: fix issue : KafkaConsumer cannot jump out of the poll method, and the…

2021-09-21 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r713524099



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1069,6 +1065,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
 }
 }
 
+private void cleanUpConsumedOffsets(Map 
willCommitOffsets) {
+
+if (willCommitOffsets.isEmpty())
+return;
+
+Set validTopics = metadata.fetch().topics();
+Set toGiveUpTopicPartitions = new HashSet<>();
+
+Iterator> iterator = 
willCommitOffsets.entrySet().iterator();
+
+while (iterator.hasNext()) {
+
+Map.Entry entry = 
iterator.next();
+
+if (!validTopics.contains(entry.getKey().topic())) {
+
+toGiveUpTopicPartitions.add(entry.getKey());
+iterator.remove();
+}
+
+}
+
+if (toGiveUpTopicPartitions.size() > 0) {
+
+//Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
+//We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
+log.warn("Synchronous auto-commit of offsets {} will be 
abandoned", toGiveUpTopicPartitions);

Review comment:
   Thank you for your reply.
   Can you simulate the case of jira and debug test it locally?
   
   In fact, what is filtered here are all topicPartitions that are ready to 
commitOffsets but may trigger `UnknownTopicOrPartitionException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread GitBox


junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713494615



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -499,12 +499,13 @@ class BrokerServer(
   if (clientToControllerChannelManager != null)
 CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-  if (logManager != null)
-CoreUtils.swallow(logManager.shutdown(), this)
-  // be sure to shutdown scheduler after log manager
+  // be sure to shutdown scheduler before log manager

Review comment:
   It seems that none of the scheduled calls depends on the completion of 
the returned future. When shutting down KafkaScheduler, we call 
ScheduledThreadPoolExecutor.shutdown(), which doesn't wait for all existing 
tasks to complete. This seems ok since for important tasks (e.g. flushing the 
log), we make explicit calls during shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread GitBox


ijuma commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713489901



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -499,12 +499,13 @@ class BrokerServer(
   if (clientToControllerChannelManager != null)
 CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-  if (logManager != null)
-CoreUtils.swallow(logManager.shutdown(), this)
-  // be sure to shutdown scheduler after log manager
+  // be sure to shutdown scheduler before log manager

Review comment:
   Are all tasks scheduled of the async variety? It would be nice if we 
would stop issuing new async tasks when shutdown starts, but let the existing 
ones complete.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #11348: MINOR: fix CreateTopic to return the same as DescribeTopic

2021-09-21 Thread GitBox


junrao commented on pull request #11348:
URL: https://github.com/apache/kafka/pull/11348#issuecomment-924462411


   cherry-picked the PR to 3.0 branch too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #11348: MINOR: fix CreateTopic to return the same as DescribeTopic

2021-09-21 Thread GitBox


junrao merged pull request #11348:
URL: https://github.com/apache/kafka/pull/11348


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread GitBox


junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r713478871



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -499,12 +499,13 @@ class BrokerServer(
   if (clientToControllerChannelManager != null)
 CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-  if (logManager != null)
-CoreUtils.swallow(logManager.shutdown(), this)
-  // be sure to shutdown scheduler after log manager
+  // be sure to shutdown scheduler before log manager

Review comment:
   (1) Could we add some comment to explain why we want to shut down the 
log manager later?
   (2) If we are shutting down, it makes sense to stop the scheduler first 
since there is no guarantee any asynchronously scheduled task will complete. If 
this is the case, should we shut down the scheduler before any other component 
(e.g. ReplicaManger also uses scheduler)?
   (3) Once the scheduler is shut down, scheduling new tasks causes 
IllegalStateException. That's probably the exception that 
https://github.com/apache/kafka/pull/10538 tries to fix. To avoid polluting the 
log, perhaps we could change KafkaScheduler such that it avoids throwing an 
exception once it's shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-21 Thread GitBox


showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r713474671



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   Awesome!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11348: MINOR: fix CreateTopic to return the same as DescribeTopic

2021-09-21 Thread GitBox


ccding commented on a change in pull request #11348:
URL: https://github.com/apache/kafka/pull/11348#discussion_r713470616



##
File path: core/src/main/scala/kafka/server/ZkAdminManager.scala
##
@@ -118,7 +118,7 @@ class ZkAdminManager(val config: KafkaConfig,
 metadataAndConfigs.get(topicName).foreach { result =>
   val logConfig = 
LogConfig.fromProps(LogConfig.extractLogConfigMap(config), configs)
   val createEntry = configHelper.createTopicConfigEntry(logConfig, 
configs, includeSynonyms = false, includeDocumentation = false)(_, _)
-  val topicConfigs = logConfig.nonInternalValues.asScala.map { case (k, v) 
=>
+  val topicConfigs = (logConfig.originals.asScala.filter(_._2 != null) ++ 
logConfig.nonInternalValues.asScala).map { case (k, v) =>

Review comment:
   fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-21 Thread GitBox


vincent81jiang commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r713458476



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -511,6 +513,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {

Review comment:
   Pushed a new iteration. Would like to hear your opinion on which version 
is better. (Didn't put it in LogCleanerManager.grabFilthiestCompactedLog 
because grabFilthiestCompactedLog checks current logs, maintenance task checks 
uncleanablePartitions, they're not closely related.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-21 Thread GitBox


vincent81jiang commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r713458476



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -511,6 +513,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {

Review comment:
   Pushed a new iteration. Would like to hear your opinion on which way is 
better. (Didn't put it in LogCleanerManager.grabFilthiestCompactedLog because 
grabFilthiestCompactedLog checks current logs, maintenance task checks 
uncleanablePartitions, they're not closely related.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-21 Thread GitBox


vincent81jiang commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r713458476



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -511,6 +513,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {

Review comment:
   Pushed a new iteration. Didn't put it in 
LogCleanerManager.grabFilthiestCompactedLog because grabFilthiestCompactedLog 
checks current logs, maintenance task checks uncleanablePartitions, they're not 
closely related.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-21 Thread GitBox


vincent81jiang commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r713458476



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -511,6 +513,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {

Review comment:
   Pushed a new iteration




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-21 Thread GitBox


vincent81jiang commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r713451787



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -511,6 +513,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {

Review comment:
   @junrao, I tried to put the logic in LogCleaner.doWork at the beginning 
(which I feel is similar to LogCleanerManager.grabFilthiestCompactedLog) but 
then reverted the change simply because maintenance work performed more 
frequently. If the overhead of frequent maintenance is not a concern (I assume 
the maintenance task is cheap), we can definitely do this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding opened a new pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread GitBox


ccding opened a new pull request #11351:
URL: https://github.com/apache/kafka/pull/11351


   We have seen an exception caused by shutting down the scheduler before 
shutting down LogManager.
   
   When LogManager was closing partitions one by one, the scheduler called to 
delete old segments due to retention. However, the old segments could have been 
closed by the LogManager, which caused an exception and subsequently marked 
logdir as offline. As a result, the broker didn't flush the remaining 
partitions and didn't write the clean shutdown marker. Ultimately the broker 
took hours to recover the log during restart.
   
   This PR essentially reverts https://github.com/apache/kafka/pull/10538
   
   I believe the exception https://github.com/apache/kafka/pull/10538 saw is at 
https://github.com/apache/kafka/blob/5a6f19b2a1ff72c52ad627230ffdf464456104ee/core/src/main/scala/kafka/log/LocalLog.scala#L895-L903
 which called the scheduler and crashed the compaction thread. The effect of 
this exception has been mitigated by https://github.com/apache/kafka/pull/10763
   
   cc @rondagostino @ijuma @cmccabe @junrao @dhruvilshah3 as authors/reviewers 
of the PRs mentioned above to make sure this change look okay.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11350: Scala3 migration

2021-09-21 Thread GitBox


jlprat commented on a change in pull request #11350:
URL: https://github.com/apache/kafka/pull/11350#discussion_r713423728



##
File path: core/src/main/scala/kafka/controller/PartitionStateMachine.scala
##
@@ -477,7 +477,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
 } else {
   val (logConfigs, failed) = zkClient.getLogConfigs(
 partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) 
=> partition.topic }.toSet,
-config.originals()
+config.originals

Review comment:
   Changes like this are the parenthesis-less methods that should be called 
without parenthesis

##
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##
@@ -234,7 +234,7 @@ public KafkaClusterTestKit build() throws Exception {
 Option.apply(threadNamePrefix),
 
JavaConverters.asScalaBuffer(Collections.emptyList()).toSeq(),
 connectFutureManager.future,
-Server.SUPPORTED_FEATURES()
+Server$.MODULE$.SUPPORTED_FEATURES()

Review comment:
   Changes like this are related to 
https://github.com/lampepfl/dotty/issues/13572

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -22,7 +22,7 @@ import java.util.{Collections, Properties}
 import joptsimple._
 import kafka.common.AdminCommandFailedException
 import kafka.log.LogConfig
-import kafka.utils._
+import kafka.utils.{immutable => _, _}

Review comment:
   Changes like this are due to shadowing between `kafka.utils.immutable` 
and the `immutable` package in `scala.collections`.

##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -27,7 +27,7 @@ import com.typesafe.scalalogging.LazyLogging
 import joptsimple._
 import kafka.utils.Implicits._
 import kafka.utils.{Exit, _}
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig => 
ClientConsumerConfig, ConsumerRecord, KafkaConsumer}

Review comment:
   This is done to avoid shadowing

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -467,1251 +451,3 @@ object SocketServer {
 
   val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp, 
KafkaConfig.MaxConnectionCreationRateProp)
 }
-

Review comment:
   This is what I mentioned about splitting classes present in 
`SocketServer` into their own file

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -42,11 +42,11 @@ final class KafkaMetadataLog private (
   // Access to this object needs to be synchronized because it is used by the 
snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to 
store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
-  topicPartition: TopicPartition,
+  topicPartitionArg: TopicPartition,

Review comment:
   Changes like this are to avoid the shadowing between the parameter and 
the method

##
File path: core/src/main/scala/kafka/log/LazyIndex.scala
##
@@ -21,7 +21,7 @@ import java.io.File
 import java.nio.file.{Files, NoSuchFileException}
 import java.util.concurrent.locks.ReentrantLock
 
-import LazyIndex._
+

Review comment:
   This import was causing a cyclic problem in Scala3.

##
File path: core/src/main/scala/kafka/utils/json/DecodeJson.scala
##
@@ -85,13 +85,13 @@ object DecodeJson {
 else decodeJson.decodeEither(node).map(Some(_))
   }
 
-  implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: 
DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) 
=> {
+  implicit def decodeSeq[E, S[E] <: Seq[E]](implicit decodeJson: 
DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) 
=> {

Review comment:
   Scala3 compiler is more strict with type definitions and the previous 
one wasn't really being satisfied

##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -2244,7 +2244,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(): 
Unit = {
 removeAllClientAcls()
 
-for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to 
ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
+for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion.toInt to 
ApiKeys.DESCRIBE_CLUSTER.latestVersion.toInt) {

Review comment:
   The `to` method is not present in `Short` type and Scala3 doesn't widen 
the type automatically

##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -481,8 +481,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegr

[jira] [Created] (KAFKA-13315) log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread Cong Ding (Jira)
Cong Ding created KAFKA-13315:
-

 Summary: log layer exception during shutdown that caused an 
unclean shutdown
 Key: KAFKA-13315
 URL: https://issues.apache.org/jira/browse/KAFKA-13315
 Project: Kafka
  Issue Type: Bug
Reporter: Cong Ding


We have seen an exception caused by shutting down scheduler before shutting 
down LogManager.

When LogManager was closing partitons one by one, scheduler called to delete 
old segments due to retention. However, the old segments could have been closed 
by the LogManager, which subsequently marked logdir as offline and didn't write 
the clean shutdown marker. Ultimately the broker would take hours to restart.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13315) log layer exception during shutdown that caused an unclean shutdown

2021-09-21 Thread Cong Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cong Ding reassigned KAFKA-13315:
-

Assignee: Cong Ding

> log layer exception during shutdown that caused an unclean shutdown
> ---
>
> Key: KAFKA-13315
> URL: https://issues.apache.org/jira/browse/KAFKA-13315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Cong Ding
>Assignee: Cong Ding
>Priority: Major
>
> We have seen an exception caused by shutting down scheduler before shutting 
> down LogManager.
> When LogManager was closing partitons one by one, scheduler called to delete 
> old segments due to retention. However, the old segments could have been 
> closed by the LogManager, which subsequently marked logdir as offline and 
> didn't write the clean shutdown marker. Ultimately the broker would take 
> hours to restart.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #11350: Scala3 migration

2021-09-21 Thread GitBox


jlprat commented on pull request #11350:
URL: https://github.com/apache/kafka/pull/11350#issuecomment-924387703


   Ping @ijuma I was able to have some progress with migrating to Scala3. I 
still need to check why some test failures occur only on Scala3. Let me know 
what you think about the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12954) Add Support for Scala 3 in 4.0.0

2021-09-21 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418321#comment-17418321
 ] 

Josep Prat commented on KAFKA-12954:


Work in progress PR is available here: 
[https://github.com/apache/kafka/pull/11350]

There are tests failing when compiling to Scala3, but they pass on Scala 2.13

> Add Support for Scala 3 in 4.0.0
> 
>
> Key: KAFKA-12954
> URL: https://issues.apache.org/jira/browse/KAFKA-12954
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Josep Prat
>Assignee: Josep Prat
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> This is a follow up task from 
> https://issues.apache.org/jira/browse/KAFKA-12895, in which Scala 2.12 
> support will be dropped.
> It would be good to, at the same time, add support for Scala 3.
> Initially it would be enough to only make the code compile with Scala 3 so we 
> can generate the proper Scala 3 artifacts, this might be achieved with the 
> proper compiler flags and an occasional rewrite.
> Follow up tasks could be created to migrate to a more idiomatic Scala 3 
> writing if desired.
> If I understand it correctly, this would need a KIP as we are modifying the 
> public interfaces (new artifacts). If this is the case, let me know  and I 
> will write it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat opened a new pull request #11350: Scala3 migration

2021-09-21 Thread GitBox


jlprat opened a new pull request #11350:
URL: https://github.com/apache/kafka/pull/11350


   This PR lays the groundwork for the Scala3 migration of the code base. 
Gradle's Scala3 plugin is merged but not released yet (see 
[PR](https://github.com/gradle/gradle/pull/18001)).
   During the migration I encountered 2 different bugs in Scala3, they are 
filed [here](https://github.com/lampepfl/dotty/issues/13549) and 
[here](https://github.com/lampepfl/dotty/issues/13572).
   
   # Types of changes done in this PR:
   * Shadowing variables or packages are now errors instead of Scala doing 
guess work. A fair amount of the changes done in this PR was disambiguation. 
These include:
 * Parameter shadowing
 * Shadowing among imports
   * Parameterless methods called with parameters is not possible any more in 
Scala3
   * `SockerServer` extra classes were split into different files as Scala3 
compiler was failing to find them when referenced in other classes in this same 
file.
   * `foreach` java collections method was not directly usable under Scala3, so 
I transformed it to the Scala collection's one (this might be another bug in 
Scala3, I need to investigate)
   * Scala3 fails to determine the right overload method when SAM is involved 
if types don't match exactly (a.k.a. bug 
https://github.com/lampepfl/dotty/issues/13549)
   * Scala3 companion object of a trait doesn't have the static forwarder 
methods, needed to reference the companion object "old style" (a.k.a. bug 
https://github.com/lampepfl/dotty/issues/13572)
   * Extra manual typing was needed occasionally as Scala3 is probably stricter 
than Scala 2.13
   
   # Compiling with Scala3
   In order to test this locally one can run the following:
   
   `./gradlew wrapper 
--gradle-distribution-url=https://services.gradle.org/distributions-snapshots/gradle-7.3-20210906222431+-bin.zip`
   And then the usual
   `./gradlew compileTestScala -PscalaVersion=3.0`
   
   # Notes
   Jackson is using "2.13.0-rc2" version as it's the one that contains Scala3 
improvements, it's not really needed to successfully compile though.
   
   Extra information, Scala3 is compiling in "Migration Mode", meaning it 
outputs some warnings about deprecated and dropped features. See [Migration 
Mode](https://docs.scala-lang.org/scala3/guides/migration/tooling-migration-mode.html)
 for further info.
   All these warnings can be automatically fixed by the Scala compiler itself.
   
   # Current Problems
   Spotbugs is currently detecting 30 problems with Scala3, it works fine when 
compiling with Scala 2.13. This currently blocks the execution of core and 
streams tests. By excluding `spotbugs` tests can be run and some tests are 
still failing, I need to find out why is this. Tests run successfully in Scala 
2.13. To exclude `spotbugs` run the following:
   `./gradlew test -x spotbugsMain -PscalaVersion=3.0`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] glasser commented on pull request #10873: KAFKA-7360 Fixed code snippet

2021-09-21 Thread GitBox


glasser commented on pull request #10873:
URL: https://github.com/apache/kafka/pull/10873#issuecomment-924378381


   I'm not sure why you chose that list of reviewers but I am not a Kafka 
reviewer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11348: MINOR: fix CreateTopic to return the same as DescribeTopic

2021-09-21 Thread GitBox


junrao commented on a change in pull request #11348:
URL: https://github.com/apache/kafka/pull/11348#discussion_r713413729



##
File path: core/src/main/scala/kafka/server/ZkAdminManager.scala
##
@@ -118,7 +118,7 @@ class ZkAdminManager(val config: KafkaConfig,
 metadataAndConfigs.get(topicName).foreach { result =>
   val logConfig = 
LogConfig.fromProps(LogConfig.extractLogConfigMap(config), configs)
   val createEntry = configHelper.createTopicConfigEntry(logConfig, 
configs, includeSynonyms = false, includeDocumentation = false)(_, _)
-  val topicConfigs = logConfig.nonInternalValues.asScala.map { case (k, v) 
=>
+  val topicConfigs = (logConfig.originals.asScala.filter(_._2 != null) ++ 
logConfig.nonInternalValues.asScala).map { case (k, v) =>

Review comment:
   Could we move ConfigHelper.describeConfigs.allConfigs to ConfigHelper 
and reuse it here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] 3schwartz commented on a change in pull request #11340: fix issue : KafkaConsumer cannot jump out of the poll method, and the…

2021-09-21 Thread GitBox


3schwartz commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r713359407



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1069,6 +1065,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
 }
 }
 
+private void cleanUpConsumedOffsets(Map 
willCommitOffsets) {
+
+if (willCommitOffsets.isEmpty())
+return;
+
+Set validTopics = metadata.fetch().topics();
+Set toGiveUpTopicPartitions = new HashSet<>();
+
+Iterator> iterator = 
willCommitOffsets.entrySet().iterator();
+
+while (iterator.hasNext()) {
+
+Map.Entry entry = 
iterator.next();
+
+if (!validTopics.contains(entry.getKey().topic())) {
+
+toGiveUpTopicPartitions.add(entry.getKey());
+iterator.remove();
+}
+
+}
+
+if (toGiveUpTopicPartitions.size() > 0) {
+
+//Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
+//We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
+log.warn("Synchronous auto-commit of offsets {} will be 
abandoned", toGiveUpTopicPartitions);

Review comment:
   I believe the messages is in correct - it is the whole partitions for 
which any offsets are not committed.
   
   `log.warn("Synchronous auto-commit of offsets for partitions {} will be 
abandoned", toGiveUpTopicPartitions);`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12225) Unexpected broker bottleneck when scaling producers

2021-09-21 Thread Sam Cantero (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418292#comment-17418292
 ] 

Sam Cantero commented on KAFKA-12225:
-

is this similar to https://issues.apache.org/jira/browse/KAFKA-12838?

> Unexpected broker bottleneck when scaling producers
> ---
>
> Key: KAFKA-12225
> URL: https://issues.apache.org/jira/browse/KAFKA-12225
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: AWS Based
> 5-node cluster running on k8s with EBS attached disks (HDD)
> Kafka Version 2.5.0
> Multiple Producers (KafkaStreams, Akka Streams, golang Sarama)
>Reporter: Harel Ben Attia
>Priority: Major
>
>  
> *TLDR*: There seems to be a major lock contention that can happen on 
> *{{Log.lock}}* during producer-scaling when produce-request sending is 
> time-based ({{linger.ms}}) and not data-size based (max batch size).
> Hi,
> We're running a 5-node Kafka cluster on one of our production systems on AWS. 
> Recently, we have started to notice that as our producer services scale out, 
> the Kafka idle-percentage drops abruptly from ~70% idle percentage to 0% on 
> all brokers, even though none of the physical resources of the brokers are 
> exhausted.
> Initially, we realised that our {{io.thread}} count was too low, causing high 
> request queuing and the low idle percentage, so we have increased it, hoping 
> to see one of the physical resources maxing out. After changing it we still 
> continued to see abrupt drops of the idle-percentage to 0% (with no physical 
> resource maxing out), so we continued to investigate.
> The investigation has shown that there's a direct relation to {{linger.ms}} 
> being the controlling factor of sending out produce requests. Whenever 
> messages are being sent out from the producer due to the {{linger.ms}} 
> threshold, scaling out the service increased the number of produce requests 
> in a way which is not proportional to our traffic increase, bringing down all 
> the brokers to a near-halt in terms of being able to process requests and, as 
> mentioned, without any exhaustion of physical resources.
> After some more experiments and profiling a broker through flight recorder, 
> we have found out that the cause of the issue is a lock contention on a 
> *{{java.lang.Object}}*, wasting a lot of time on all the 
> {{data-plane-kafka-request-handler}} threads. 90% of the locks were on Log's 
> *{{lock: Object}}* instance, inside the *{{Log.append()}}* method. The stack 
> traces show that these locks occur during the {{handleProductRequest}} 
> method. We have ruled out replication as the source of the issues, as there 
> were no replication issues, and the control-plane has a separate thread pool, 
> so this focused us back on the actual producers, leading back to the 
> behaviour of our producer service when scaling out.
> At that point we thought that maybe the issue is related to the number of 
> partitions of the topic (60 currently), and increasing it would reduce the 
> lock contention on each {{Log}} instance, but since each producer writes to 
> all partitions (data is evenly spread and not skewed), then increasing the 
> number of partitions would only cause each producer to generate more 
> produce-requests, not alleviating the lock contention. Also, increasing the 
> number of brokers would increase the idle percentage per broker, but 
> essentially would not help reducing the produce-request latency, since this 
> would not change the rate of produce-requests per Log.
> Eventually, we've worked around the issue by making the {{linger.ms}} value 
> high enough so it stopped being the controlling factor of sending messages 
> (e.g. produce-requests became coupled to the size of the traffic due to the 
> max batch size becoming the controlling factor). This allowed us to utilise 
> the cluster better without upscaling it.
> From our analysis, it seems that this lock behaviour limits Kafka's ability 
> to be robust to producer configuration and scaling, and hurts the ability to 
> do efficient capacity planning for the cluster, increasing the risk of an 
> unexpected bottleneck when traffic increases.
> It would be great if you can validate these conclusions, or provide any more 
> information that will help us understand the issue better or work around it 
> in a more efficient way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-09-21 Thread GitBox


guozhangwang commented on a change in pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#discussion_r713277824



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -462,14 +460,34 @@ public boolean hasNext() {
 }
 
 final Bytes key = getKey(next.key);
-if (key.compareTo(getKey(keyFrom)) >= 0 && 
key.compareTo(getKey(keyTo)) <= 0) {
+if (isKeyWithinRange(key)) {
 return true;
 } else {
 next = null;
 return hasNext();
 }
 }
 
+private boolean isKeyWithinRange(final Bytes key) {
+boolean isKeyInRange = false;
+// split all cases for readability and avoid 
BooleanExpressionComplexity checkstyle warning
+if (keyFrom == null && keyTo == null) {
+// fetch all
+isKeyInRange = true;
+} else if (keyFrom == null && key.compareTo(getKey(keyTo)) <= 0) {
+// start from the beginning
+isKeyInRange = true;
+} else if (key.compareTo(getKey(keyFrom)) >= 0 && keyTo == null) {

Review comment:
   Think about that a bit more, maybe we can make it simpler as:
   ```
   if (keyFrom == null && keyTo == null) {
   // fetch all
   return true;
   } else if (keyFrom == null) {
   // start from the beginning
   return key.compareTo(getKey(keyTo)) <= 0;
   } else if (keyTo == null) {
   // end to the last
   return key.compareTo(getKey(keyFrom)) >= 0; 
   } else {
   return key.compareTo(getKey(keyFrom)) >= 0 && 
key.compareTo(getKey(keyTo)) <= 0;
   }
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -462,14 +460,34 @@ public boolean hasNext() {
 }
 
 final Bytes key = getKey(next.key);
-if (key.compareTo(getKey(keyFrom)) >= 0 && 
key.compareTo(getKey(keyTo)) <= 0) {
+if (isKeyWithinRange(key)) {
 return true;
 } else {
 next = null;
 return hasNext();
 }
 }
 
+private boolean isKeyWithinRange(final Bytes key) {
+boolean isKeyInRange = false;
+// split all cases for readability and avoid 
BooleanExpressionComplexity checkstyle warning
+if (keyFrom == null && keyTo == null) {
+// fetch all
+isKeyInRange = true;
+} else if (keyFrom == null && key.compareTo(getKey(keyTo)) <= 0) {
+// start from the beginning
+isKeyInRange = true;
+} else if (key.compareTo(getKey(keyFrom)) >= 0 && keyTo == null) {

Review comment:
   nit: let's move `keyTo == null` up first so that if it does not satisfy, 
we do not need to trigger `getKey` anymore.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java
##
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindow

[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-21 Thread GitBox


vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r713255569



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   @showuon that's an interesting idea. But, I think it needs to be in the 
opposite order ie, we need to fetch from windowStartBoundary -> end. We don't 
know the end timestamp in this case. (The above code is wrong as I checked in 
during some testing).
   
   So, what I mean to say is, 
   
   this:
   
   `final KeyValueIterator, byte[]> allWindowedKeyValueIterator 
= forward ?
wrapped().fetchAll(0, windowStartBoundary) : wrapped().backwardFetchAll(0, 
windowStartBoundary);`
   
   should be 
   
   `final KeyValueIterator, byte[]> allWindowedKeyValueIterator 
= forward ?
wrapped().fetchAll(windowStartBoundary, end) : 
wrapped().backwardFetchAll(windowStartBoundary, end);`
   
   
   But, we don't know the end. I ran it with Long.MAX_VALUE and the test case 
in question seems to have passed. Thanks for that!
   
   I will clean up this code again and send for review again. Thanks again!
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #11076: KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw…

2021-09-21 Thread GitBox


vamossagar12 commented on pull request #11076:
URL: https://github.com/apache/kafka/pull/11076#issuecomment-924190789


   I guess this test case got fixed recently in a new commit => 
`shouldQueryStoresAfterAddingAndRemovingStreamThread` =>
   
   
https://github.com/apache/kafka/commit/5a6f19b2a1ff72c52ad627230ffdf464456104ee
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11311: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds

2021-09-21 Thread GitBox


cmccabe commented on a change in pull request #11311:
URL: https://github.com/apache/kafka/pull/11311#discussion_r713235847



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
##
@@ -92,6 +98,148 @@ public int hashCode() {
 return Objects.hash(topicsById, topicsByName);
 }
 
+/**
+ * Expose a view of this TopicsImage as a map from topic names to IDs.
+ *
+ * Like TopicsImage itself, this map is immutable.
+ */
+public Map topicNameToIdView() {
+return new TopicNameToIdMap();
+}
+
+class TopicNameToIdMap extends AbstractMap {
+private final TopicNameToIdMapEntrySet set = new 
TopicNameToIdMapEntrySet();
+
+@Override
+public boolean containsKey(Object key) {
+return topicsByName.containsKey(key);
+}
+
+@Override
+public Uuid get(Object key) {
+TopicImage image = topicsByName.get(key);
+if (image == null) return null;
+return image.id();
+}
+
+@Override
+public Set> entrySet() {
+return set;
+}
+}
+
+class TopicNameToIdMapEntrySet extends AbstractSet> {
+@Override
+public Iterator> iterator() {
+return new 
TopicNameToIdMapEntrySetIterator(topicsByName.entrySet().iterator());
+}
+
+@SuppressWarnings("rawtypes")
+@Override
+public boolean contains(Object o) {
+if (!(o instanceof Entry)) return false;
+Entry other = (Entry) o;
+TopicImage image = topicsByName.get(other.getKey());
+if (image == null) return false;
+return image.id().equals(other.getValue());
+}
+
+@Override
+public int size() {
+return topicsByName.size();
+}
+}
+
+static class TopicNameToIdMapEntrySetIterator implements 
Iterator> {
+private final Iterator> iterator;
+
+TopicNameToIdMapEntrySetIterator(Iterator> 
iterator) {
+this.iterator = iterator;
+}
+
+@Override
+public boolean hasNext() {
+return this.iterator.hasNext();
+}
+
+@Override
+public Entry next() {
+Entry entry = iterator.next();
+return new SimpleImmutableEntry<>(entry.getKey(), 
entry.getValue().id());

Review comment:
   The existing code literally copies the entire map every time. Copying 
the entire map is going to be more expensive than copying just the Entry 
elements (although again, this is something we should not be doing, so not 
worth optimizing)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13314) Pluggable components initialized with getConfiguredInstance do not respect dynamic config updates

2021-09-21 Thread David Mao (Jira)
David Mao created KAFKA-13314:
-

 Summary: Pluggable components initialized with 
getConfiguredInstance do not respect dynamic config updates
 Key: KAFKA-13314
 URL: https://issues.apache.org/jira/browse/KAFKA-13314
 Project: Kafka
  Issue Type: Bug
  Components: config, core
Reporter: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

2021-09-21 Thread GitBox


vvcephei merged pull request #11334:
URL: https://github.com/apache/kafka/pull/11334


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante edited a comment on pull request #11333: KAFKA-13306: Null connector config value passes validation, but fails creation

2021-09-21 Thread GitBox


C0urante edited a comment on pull request #11333:
URL: https://github.com/apache/kafka/pull/11333#issuecomment-924133203


   Thanks @lhunyady. I wonder if instead of throwing an exception we can add an 
error message to the offending config properties? This would allow other 
configuration errors to be surfaced immediately in calls to `PUT 
/connector-plugins/{connectorClass}/config/validate` instead of requiring users 
to issue a follow-up request, and would return a more standard request type 
than the 500 that I believe would be returned in response to an uncaught 
exception.
   
   If this approach is desirable, we could implement it with an additional step 
in 
[`AbstractHerder::validateConnectorConfig`](https://github.com/apache/kafka/blob/f650a14d56c0cc33263c29d8d242760406943c5b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L418)
 that uses 
[`ConfigValue::addErrorMessage`](https://kafka.apache.org/28/javadoc/org/apache/kafka/common/config/ConfigValue.html#addErrorMessage(java.lang.String))
 to report to the user that literal `null` values are not permitted in 
connector configurations, after adding a new `ConfigValue` for the offending 
property if one isn't already defined by the connector, the Connect framework, 
etc.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #11333: KAFKA-13306: Null connector config value passes validation, but fails creation

2021-09-21 Thread GitBox


C0urante commented on pull request #11333:
URL: https://github.com/apache/kafka/pull/11333#issuecomment-924133203


   Thanks @lhunyady. I wonder if instead of throwing an exception we can add an 
error message to the offending config properties? This would allow other 
configuration errors to be surfaced immediately in calls to `PUT 
/connector-plugins/{connectorClass}/config/validate` instead of requiring users 
to issue a follow-up request, and would return a more standard request type 
than the 500 that I believe would be returned in response to an uncaught 
exception.
   
   If this approach is desirable, we could implement it with an additional step 
in 
[`AbstractHerder::validateConnectorConfig`](https://github.com/apache/kafka/blob/f650a14d56c0cc33263c29d8d242760406943c5b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L418)
 that uses 
[`ConfigValue::addErrorMessage`](https://kafka.apache.org/28/javadoc/org/apache/kafka/common/config/ConfigValue.html#addErrorMessage(java.lang.String))
 to report to the user that literal `null` values are not permitted in 
connector configurations, adding a new `ConfigValue` for the offending property 
if one isn't already defined by the connector, the Connect framework, etc.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #11349: MINOR: Fix use of ConfigException in AbstractConfig class

2021-09-21 Thread GitBox


C0urante opened a new pull request #11349:
URL: https://github.com/apache/kafka/pull/11349


   The two-arg variant is intended to take a property name and value, not an 
exception message and a cause.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

2021-09-21 Thread GitBox


wcarlson5 commented on a change in pull request #11334:
URL: https://github.com/apache/kafka/pull/11334#discussion_r713141647



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -477,6 +477,7 @@ public void 
shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
 //Add thread
 final Optional streamThread = kafkaStreams1.addStreamThread();
 assertThat(streamThread.isPresent(), is(true));
+until(() -> kafkaStreams1.state().isRunningOrRebalancing());

Review comment:
   It is true that we are waiting for state below, however the way we are 
doing so causes some exceptions due to streams being in the wrong state. These 
exceptions are really not the point of any of the tests in this file. So how we 
have gotten around that was white listing exception messages. It would be 
better just to wait for the correct state before trying do do any IQ, like what 
is done here. @andy0x01 the isolation of the add remove is a nice benefit as 
well. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11348: MINOR: fix CreateTopic to return the same as DescribeTopic

2021-09-21 Thread GitBox


ccding commented on pull request #11348:
URL: https://github.com/apache/kafka/pull/11348#issuecomment-924028973


   failed tests are passing on my local run


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andy0x01 commented on a change in pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

2021-09-21 Thread GitBox


andy0x01 commented on a change in pull request #11334:
URL: https://github.com/apache/kafka/pull/11334#discussion_r713063360



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -477,6 +477,7 @@ public void 
shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
 //Add thread
 final Optional streamThread = kafkaStreams1.addStreamThread();
 assertThat(streamThread.isPresent(), is(true));
+until(() -> kafkaStreams1.state().isRunningOrRebalancing());

Review comment:
   You are right. I see the point that it should not make any functional 
difference.
   
   According to `KafkaStreams.State`: 
   
   > REBALANCING state will transit to RUNNING if all of its threads are in 
RUNNING state
   
   ( so as soon as the new `StreamThread` is in `RUNNING`)
   
   Although `REBALANCING` is explicitly stated in 
[KAFKA-13246](https://issues.apache.org/jira/browse/KAFKA-13246):
   
   > 
StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread 
should be improved by waiting for the client to go to rebalancing or running 
after adding and removing a thread. It should also wait until running before 
querying the state store 
   
   One small benefit I see here is to assert in isolation of any following code 
that adding/removing a thread was indeed successful.
   
   @wcarlson5 maybe you as the author of the JIRA issue could give us a hint if 
we are overlooking anything here ?
   Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #11276: KAFKA-13240: Disable HTTP TRACE Method in Connect

2021-09-21 Thread GitBox


viktorsomogyi commented on pull request #11276:
URL: https://github.com/apache/kafka/pull/11276#issuecomment-923908827


   @mimaison would you please check this again if you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ahamedmulaffer commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2021-09-21 Thread GitBox


ahamedmulaffer commented on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-923888236


   Any update or possible workaround ? @rhauch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lhunyady commented on pull request #11333: KAFKA-13306: Null connector config value passes validation, but fails creation

2021-09-21 Thread GitBox


lhunyady commented on pull request #11333:
URL: https://github.com/apache/kafka/pull/11333#issuecomment-923881582


   @C0urante  @kkonstantine Can you please review this change for me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-21 Thread GitBox


showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r712843244



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   @vamossagar12 , sorry for late reply. I read through your changes again, 
and I'm thinking that if we could fetch a specific time range via `fetch` API, 
instead of `fetchAll`, then doing manually filtering? This way, we wont have a 
separate iterator, so the test should pass.
   
   For example:
   ```java
   private KeyValueIterator, V> filterExpiredRecords(final boolean 
forward) {
   // this is what you've done, to fetch all, then filter later.
   // final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
  
  final long observedStreamTime = getObservedStreamTime(wrapped());
  if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
   return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   

   
  final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
   
  // what about this? (use fetchAll(Instant timeFrom, Instant timeTo) 
to fetch the records in time range,   
  //   by setting timeTo to the boundary we computed)
  final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ?
wrapped().fetchAll(0, windowStartBoundary) : wrapped().backwardFetchAll(0, 
windowStartBoundary);
   
  return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   
   /*
   final long windowStartBoundary = observedStreamTime - 
retentionPeriod + 1;
   final List, byte[]>> expiredRecords = new 
ArrayList<>();
   
   while (allWindowedKeyValueIterator.hasNext()) {
   final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
   if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
   expiredRecords.add(next);
   }
   }
   for (KeyValue, byte[]> record: expiredRecords) {
   wrapped().put(record.key.key(), null, 
record.key.window().start());
   }
  */
   
   //return new MeteredWindowedKeyValueIterator<>(wrapped().all(), 
fetchSensor, streamsMetrics, serdes, time);
   //return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
   }
   ```
   
   
   Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-21 Thread GitBox


showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r712843244



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   @vamossagar12 , sorry for late reply. I read through your changes again, 
and I'm thinking that if we could fetch a specific time range via `fetch` API, 
instead of `fetchAll`, then doing manually filtering?
   
   For example:
   ```java
   private KeyValueIterator, V> filterExpiredRecords(final boolean 
forward) {
   // this is what you've done, to fetch all, then filter later.
   // final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
  
  final long observedStreamTime = getObservedStreamTime(wrapped());
  if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
   return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   

   
  final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
   
  // what about this? (use fetchAll(Instant timeFrom, Instant timeTo) 
to fetch the records in time range,   
  //   by setting timeTo to the boundary we computed)
  final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ?
wrapped().fetchAll(0, windowStartBoundary) : wrapped().backwardFetchAll(0, 
windowStartBoundary);
   
  return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   
   /*
   final long windowStartBoundary = observedStreamTime - 
retentionPeriod + 1;
   final List, byte[]>> expiredRecords = new 
ArrayList<>();
   
   while (allWindowedKeyValueIterator.hasNext()) {
   final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
   if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
   expiredRecords.add(next);
   }
   }
   for (KeyValue, byte[]> record: expiredRecords) {
   wrapped().put(record.key.key(), null, 
record.key.window().start());
   }
  */
   
   //return new MeteredWindowedKeyValueIterator<>(wrapped().all(), 
fetchSensor, streamsMetrics, serdes, time);
   //return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
   }
   ```
   
   
   Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-21 Thread GitBox


showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r712843244



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   @vamossagar12 , sorry for late reply. I read through your changes again, 
and I'm thinking that if we could fetch a specific time range via `fetch` API, 
instead of `fetchAll`, then doing manually filtering?
   
   For example:
   ```java
   private KeyValueIterator, V> filterExpiredRecords(final boolean 
forward) {
   // this is what you've done, to fetch all, then filter later.
   // final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
  
  final long observedStreamTime = getObservedStreamTime(wrapped());
  if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
   return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   

   
  final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
   
  // what about this? (use fetchAll(Instant timeFrom, Instant timeTo) 
to fetch the records in time range,   
  //   by setting timeTo to the boundary we computed)
  final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().fetchAll(0, 
windowStartBoundary) : wrapped().backwardFetchAll(0, windowStartBoundary);
   
  return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   
   /*
   final long windowStartBoundary = observedStreamTime - 
retentionPeriod + 1;
   final List, byte[]>> expiredRecords = new 
ArrayList<>();
   
   while (allWindowedKeyValueIterator.hasNext()) {
   final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
   if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
   expiredRecords.add(next);
   }
   }
   for (KeyValue, byte[]> record: expiredRecords) {
   wrapped().put(record.key.key(), null, 
record.key.window().start());
   }
  */
   
   //return new MeteredWindowedKeyValueIterator<>(wrapped().all(), 
fetchSensor, streamsMetrics, serdes, time);
   //return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
   }
   ```
   
   
   Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-21 Thread GitBox


showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r712843244



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   @vamossagar12 , sorry for late reply. I read through your changes again, 
and I'm thinking that if we could fetch a specific time range via `fetch` API, 
instead of `fetchAll`, then doing manually filtering?
   
   For example:
   ```java
   private KeyValueIterator, V> filterExpiredRecords(final boolean 
forward) {
   // this is what you've done, to fetch all, then filter later.
   // final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
  
  final long observedStreamTime = getObservedStreamTime(wrapped());
  if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
   return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   

   
  final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
   
  // what about this? (use fetchAll(Instant timeFrom, Instant timeTo) 
to fetch the records in time range, by setting timeTo to the boundary we 
computed)
  final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().fetchAll(0, 
windowStartBoundary) : wrapped().backwardFetchAll(0, windowStartBoundary);
   
  return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
   
   
   /*
   final long windowStartBoundary = observedStreamTime - 
retentionPeriod + 1;
   final List, byte[]>> expiredRecords = new 
ArrayList<>();
   
   while (allWindowedKeyValueIterator.hasNext()) {
   final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
   if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
   expiredRecords.add(next);
   }
   }
   for (KeyValue, byte[]> record: expiredRecords) {
   wrapped().put(record.key.key(), null, 
record.key.window().start());
   }
  */
   
   //return new MeteredWindowedKeyValueIterator<>(wrapped().all(), 
fetchSensor, streamsMetrics, serdes, time);
   //return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
   }
   ```
   
   
   Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11344: KAFKA-13312; 'NetworkDegradeTest#test_rate' should wait until iperf server is listening

2021-09-21 Thread GitBox


dajac merged pull request #11344:
URL: https://github.com/apache/kafka/pull/11344


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org